model_file_manager.py 33 KB

123456789101112131415161718192021222324252627282930313233343536373839404142434445464748495051525354555657585960616263646566676869707172737475767778798081828384858687888990919293949596979899100101102103104105106107108109110111112113114115116117118119120121122123124125126127128129130131132133134135136137138139140141142143144145146147148149150151152153154155156157158159160161162163164165166167168169170171172173174175176177178179180181182183184185186187188189190191192193194195196197198199200201202203204205206207208209210211212213214215216217218219220221222223224225226227228229230231232233234235236237238239240241242243244245246247248249250251252253254255256257258259260261262263264265266267268269270271272273274275276277278279280281282283284285286287288289290291292293294295296297298299300301302303304305306307308309310311312313314315316317318319320321322323324325326327328329330331332333334335336337338339340341342343344345346347348349350351352353354355356357358359360361362363364365366367368369370371372373374375376377378379380381382383384385386387388389390391392393394395396397398399400401402403404405406407408409410411412413414415416417418419420421422423424425426427428429430431432433434435436437438439440441442443444445446447448449450451452453454455456457458459460461462463464465466467468469470471472473474475476477478479480481482483484485486487488489490491492493494495496497498499500501502503504505506507508509510511512513514515516517518519520521522523524525526527528529530531532533534535536537538539540541542543544545546547548549550551552553554555556557558559560561562563564565566567568569570571572573574575576577578579580581582583584585586587588589590591592593594595596597598599600601602603604605606607608609610611612613614615616617618619620621622623624625626627628629630631632633634635636637638639640641642643644645646647648649650651652653654655656657658659660661662663664665666667668669670671672673674675676677678679680681682683684685686687688689690691692693694695696697698699700701702703704705706707708709710711712713714715716717718719720721722723724725726727728729730731732733734735736737738739740741742743744745746747748749750751752753754755756757758759760761762763764765766767768769770771772773774775776777778779780781782783784785786787788789790791792793794795796797798799800801802803804805806807808809810811812813814815816817818819820821822823824825826827828829830831832833834835836837
  1. import asyncio
  2. from concurrent.futures import ProcessPoolExecutor
  3. from functools import partial
  4. import glob
  5. from itertools import chain
  6. import logging
  7. from pathlib import Path
  8. import platform
  9. import time
  10. import threading
  11. from typing import Dict, Tuple
  12. from filelock import Timeout
  13. from modelscope.hub.constants import TEMPORARY_FOLDER_NAME, API_FILE_DOWNLOAD_CHUNK_SIZE
  14. from multiprocessing import Manager, cpu_count
  15. from huggingface_hub._local_folder import get_local_download_paths
  16. from huggingface_hub.file_download import get_hf_file_metadata, hf_hub_url
  17. import huggingface_hub.constants
  18. from huggingface_hub.utils import build_hf_headers
  19. from gpustack.api.exceptions import NotFoundException
  20. from gpustack.config.config import Config
  21. from gpustack.logging import setup_logging
  22. from gpustack.schemas.model_files import ModelFile, ModelFileUpdate, ModelFileStateEnum
  23. from gpustack.client import ClientSet
  24. from gpustack.schemas.models import SourceEnum
  25. from gpustack.server.bus import Event, EventType
  26. from gpustack.utils import hub
  27. from gpustack.utils.file import delete_path, get_local_file_size_in_byte
  28. from gpustack.worker import downloaders
  29. from gpustack.config.registration import read_worker_token
  30. from gpustack.utils.locks import read_lock_info, get_lock_path
  31. logger = logging.getLogger(__name__)
  32. max_concurrent_downloads = 5
  33. def _cleanup_download_log(config_log_dir, model_file_id):
  34. """
  35. Clean up the download log file
  36. """
  37. try:
  38. log_dir = Path(config_log_dir) / "serve"
  39. download_log_file_path = log_dir / f"model_file_{model_file_id}.download.log"
  40. if not download_log_file_path.exists():
  41. return
  42. download_log_file_path.unlink()
  43. logger.debug(f"Cleaned up download log file: {download_log_file_path}")
  44. except Exception as e:
  45. logger.warning(
  46. f"Failed to clean up download log file for model file {model_file_id}: {e}"
  47. )
  48. class ModelFileManager:
  49. def __init__(
  50. self,
  51. worker_id: int,
  52. clientset: ClientSet,
  53. cfg: Config,
  54. ):
  55. self._worker_id = worker_id
  56. self._config = cfg
  57. self._clientset = clientset
  58. self._active_downloads: Dict[int, Tuple] = {}
  59. self._download_pool = None
  60. async def watch_model_files(self):
  61. self._prerun()
  62. while True:
  63. try:
  64. logger.debug("Started watching model files.")
  65. await self._clientset.model_files.awatch(
  66. callback=self._handle_model_file_event
  67. )
  68. except asyncio.CancelledError:
  69. break
  70. except Exception as e:
  71. logger.error(f"Failed to watch model files: {e}")
  72. await asyncio.sleep(5)
  73. def _prerun(self):
  74. self._mp_manager = Manager()
  75. self._download_pool = ProcessPoolExecutor(
  76. max_workers=min(max_concurrent_downloads, cpu_count()),
  77. )
  78. def _handle_model_file_event(self, event: Event):
  79. mf = ModelFile.model_validate(event.data)
  80. if mf.worker_id != self._worker_id:
  81. # Ignore model files that are not assigned to this worker.
  82. return
  83. logger.trace(f"Received model file event: {event.type} {mf.id} {mf.state}")
  84. if event.type == EventType.DELETED:
  85. asyncio.create_task(self._handle_deletion(mf))
  86. elif event.type in {EventType.CREATED, EventType.UPDATED}:
  87. if mf.state != ModelFileStateEnum.DOWNLOADING:
  88. return
  89. self._create_download_task(mf)
  90. def _update_model_file(self, id: int, **kwargs):
  91. model_file_public = self._clientset.model_files.get(id=id)
  92. model_file_update = ModelFileUpdate(**model_file_public.model_dump())
  93. for key, value in kwargs.items():
  94. setattr(model_file_update, key, value)
  95. self._clientset.model_files.update(id=id, model_update=model_file_update)
  96. async def _handle_deletion(self, model_file: ModelFile):
  97. entry = self._active_downloads.pop(model_file.id, None)
  98. if entry:
  99. future, cancel_flag = entry
  100. cancel_flag.set()
  101. future.cancel()
  102. try:
  103. await asyncio.wrap_future(future)
  104. except (asyncio.CancelledError, NotFoundException):
  105. pass
  106. except Exception as e:
  107. logger.error(
  108. f"Error while cancelling download for {model_file.readable_source}(id: {model_file.id}): {e}"
  109. )
  110. finally:
  111. logger.info(
  112. f"Cancelled download for deleted model: {model_file.readable_source}(id: {model_file.id})"
  113. )
  114. if model_file.cleanup_on_delete:
  115. await self._delete_model_file(model_file)
  116. async def get_hf_file_metadata(self, model_file: ModelFile, filename: str):
  117. token = self._config.huggingface_token
  118. url = hf_hub_url(model_file.huggingface_repo_id, filename)
  119. headers = build_hf_headers(token=token)
  120. metadata = await asyncio.to_thread(
  121. get_hf_file_metadata,
  122. url=url,
  123. timeout=huggingface_hub.constants.DEFAULT_ETAG_TIMEOUT,
  124. headers=headers,
  125. token=token,
  126. )
  127. return metadata
  128. async def _get_incomplete_model_files( # noqa: C901
  129. self, model_file: ModelFile
  130. ) -> set:
  131. """
  132. Finds cached files of models being downloaded.
  133. 1.For models from Hugging Face, their .incomplete filenames are encoded. The process requires:
  134. [filename_pattern → model_name → etag → incomplete_pattern → .incomplete_filename] to ultimately confirm the file.
  135. 2.For models from ModelScope, the incomplete files are stored in a temporary folder.
  136. we just need to find them by the filename pattern.
  137. """
  138. paths_to_delete = set()
  139. try:
  140. if model_file.source == SourceEnum.HUGGING_FACE:
  141. if not model_file.huggingface_filename:
  142. # The resolved_paths in vLLM model points to entire dir of cache, delete it directly
  143. paths_to_delete.update(model_file.resolved_paths)
  144. return paths_to_delete
  145. for path in model_file.resolved_paths:
  146. path_obj = Path(str(path))
  147. filename_pattern = path_obj.name
  148. local_dir = path_obj.parent
  149. download_paths = get_local_download_paths(
  150. local_dir, filename_pattern
  151. )
  152. cache_dir = download_paths.lock_path.parent
  153. filename = ""
  154. # Get actual filename by pattern
  155. for cache_file in await asyncio.to_thread(
  156. glob.glob, str(cache_dir / filename_pattern) + "*"
  157. ):
  158. # cut off the path and useless extension
  159. filename = cache_file.rsplit("/", 1)[-1]
  160. filename = filename.rsplit(".", 1)[0]
  161. break
  162. metadata = await self.get_hf_file_metadata(model_file, filename)
  163. # Collect lock files and incomplete files
  164. paths_to_delete.add(str(cache_dir / (filename + ".lock")))
  165. paths_to_delete.add(str(cache_dir / (filename + ".metadata")))
  166. for item_path_str in await asyncio.to_thread(
  167. glob.glob, str(cache_dir / f"*.{metadata.etag}.incomplete")
  168. ):
  169. paths_to_delete.add(item_path_str)
  170. elif model_file.source == SourceEnum.MODEL_SCOPE:
  171. if not model_file.model_scope_file_path:
  172. # The resolved_paths in vLLM model points to entire dir of cache, delete it directly
  173. paths_to_delete.update(model_file.resolved_paths)
  174. return paths_to_delete
  175. for path in model_file.resolved_paths:
  176. path_obj = Path(str(path))
  177. filename_pattern = path_obj.name
  178. local_dir = path_obj.parent
  179. for delete_file in await asyncio.to_thread(
  180. glob.glob,
  181. str(local_dir / f"{TEMPORARY_FOLDER_NAME}/{filename_pattern}"),
  182. ):
  183. paths_to_delete.add(delete_file)
  184. except Exception as e:
  185. logger.error(
  186. f"Error deleting incomplete Download files for "
  187. f"file '{filename}': {e}"
  188. )
  189. return paths_to_delete
  190. async def _delete_incomplete_model_files(self, model_file: ModelFile):
  191. paths_to_delete = await self._get_incomplete_model_files(model_file)
  192. for delete_file in paths_to_delete:
  193. logger.info(f"Attempting to delete incomplete file: {delete_file}")
  194. await asyncio.to_thread(delete_path, delete_file)
  195. async def _delete_model_file(self, model_file: ModelFile):
  196. try:
  197. if model_file.resolved_paths:
  198. paths = chain.from_iterable(
  199. glob.glob(p) if '*' in p else [p] for p in model_file.resolved_paths
  200. )
  201. for path in paths:
  202. delete_path(path)
  203. await self._delete_incomplete_model_files(model_file)
  204. # Clean up download log file when deleting model file
  205. _cleanup_download_log(self._config.log_dir, model_file.id)
  206. logger.info(
  207. f"Deleted model file {model_file.readable_source}(id: {model_file.id}) from disk"
  208. )
  209. except Exception as e:
  210. logger.error(
  211. f"Failed to delete {model_file.readable_source}(id: {model_file.id}: {e}"
  212. )
  213. await self._update_model_file(
  214. model_file.id,
  215. state=ModelFileStateEnum.ERROR,
  216. state_message=f"Deletion failed: {str(e)}",
  217. )
  218. def _create_download_task(self, model_file: ModelFile):
  219. if model_file.id in self._active_downloads:
  220. return
  221. cancel_flag = self._mp_manager.Event()
  222. download_task = ModelFileDownloadTask(model_file, self._config, cancel_flag)
  223. future = self._download_pool.submit(download_task.run)
  224. self._active_downloads[model_file.id] = (future, cancel_flag)
  225. logger.debug(f"Created download task for {model_file.readable_source}")
  226. async def _check_completion():
  227. try:
  228. await asyncio.wrap_future(future)
  229. except NotFoundException:
  230. logger.info(
  231. f"Model file {model_file.readable_source} not found. Maybe it was cancelled."
  232. )
  233. except Exception as e:
  234. logger.error(f"Failed to download model file: {e}")
  235. await self._update_model_file(
  236. model_file.id,
  237. state=ModelFileStateEnum.ERROR,
  238. state_message=str(e),
  239. )
  240. finally:
  241. self._active_downloads.pop(model_file.id, None)
  242. logger.debug(f"Download completed for {model_file.readable_source}")
  243. asyncio.create_task(_check_completion())
  244. class ModelFileDownloadTask:
  245. def __init__(self, model_file: ModelFile, cfg: Config, cancel_flag):
  246. self._model_file = model_file
  247. self._config = cfg
  248. self._cancel_flag = cancel_flag
  249. # Store download log file paths for related model instances
  250. self._instance_download_log_file = None
  251. self._download_completed = False
  252. # Time control for log updates
  253. self._last_log_update_time = 0
  254. self._log_update_interval = 2.0 # 2 seconds interval
  255. # Multi-file progress tracking with ANSI cursor control
  256. # Counter for generating unique tqdm IDs
  257. self._tqdm_counter = 0
  258. # Dict[tqdm_id, line_number] - tracks which line each file occupies
  259. self._file_line_mapping = {}
  260. # Dict[tqdm_id, {'last_update_time': float, 'last_progress': float}]
  261. self._file_progress_tracking = {}
  262. self._tqdm_file_basename = {}
  263. # Number of header lines in the log file
  264. self._log_header_lines = 1
  265. self._resume_threshold = 0
  266. if self._model_file.source == SourceEnum.MODEL_SCOPE:
  267. self._resume_threshold = API_FILE_DOWNLOAD_CHUNK_SIZE
  268. elif self._model_file.source == SourceEnum.HUGGING_FACE:
  269. self._resume_threshold = huggingface_hub.constants.DOWNLOAD_CHUNK_SIZE
  270. def prerun(self):
  271. setup_logging(self._config.debug)
  272. self._clientset = ClientSet(
  273. base_url=self._config.get_server_url(),
  274. api_key=read_worker_token(self._config.data_dir),
  275. )
  276. self._download_start_time = time.time()
  277. self._ensure_model_file_size_and_paths()
  278. self._speed_lock = threading.Lock()
  279. # Lock for _model_downloaded_size/_last_download_update_time/_last_downloaded_size to avoid race condition
  280. self._model_downloaded_size = 0
  281. self._last_download_update_time = 0
  282. self._last_downloaded_size = 0
  283. self._setup_instance_log_files()
  284. logger.debug(f"Initializing task for {self._model_file.readable_source}")
  285. self._update_progress_func = partial(
  286. self._update_model_file_progress, self._model_file.id
  287. )
  288. self._model_file_size = self._model_file.size
  289. self.hijack_tqdm_progress()
  290. def _setup_instance_log_files(self):
  291. try:
  292. log_dir = Path(self._config.log_dir) / "serve"
  293. # Use model file ID for shared download log across all instances using the same model file
  294. download_log_file_path = (
  295. log_dir / f"model_file_{self._model_file.id}.download.log"
  296. )
  297. # Delete existing download log file to avoid reading previous download logs
  298. # when redeploying the same model after deleting model_instance but keeping model_file
  299. if download_log_file_path.exists():
  300. try:
  301. download_log_file_path.unlink()
  302. logger.debug(
  303. f"Deleted existing download log file: {download_log_file_path}"
  304. )
  305. except Exception as e:
  306. logger.warning(
  307. f"Failed to delete existing download log file {download_log_file_path}: {e}"
  308. )
  309. self._instance_download_log_file = str(download_log_file_path)
  310. logger.debug(f"Setup shared download log file: {download_log_file_path}")
  311. except Exception as e:
  312. logger.warning(f"Failed to setup instance download log files: {e}")
  313. def _write_log_with_windows_lock(self, log_file_path: str, log_message: str):
  314. """
  315. Write log message to file using Windows msvcrt file locking
  316. """
  317. try:
  318. import msvcrt
  319. except ImportError:
  320. # msvcrt not available, fallback to basic write
  321. self._write_log_without_lock(log_file_path, log_message)
  322. return
  323. with open(log_file_path, 'a', encoding='utf-8') as f:
  324. try:
  325. # Acquire exclusive lock on the file
  326. # Lock a single byte at the beginning of the file for coordination
  327. f.seek(0)
  328. msvcrt.locking(f.fileno(), msvcrt.LK_LOCK, 1)
  329. f.seek(0, 2) # Move to end of file for appending
  330. f.write(log_message)
  331. f.flush() # Ensure immediate write to disk
  332. except (OSError, IOError):
  333. # If locking fails, fallback to basic write
  334. f.seek(0, 2) # Move to end of file for appending
  335. f.write(log_message)
  336. f.flush()
  337. finally:
  338. try:
  339. f.seek(0)
  340. msvcrt.locking(f.fileno(), msvcrt.LK_UNLCK, 1)
  341. except (OSError, IOError):
  342. pass # Ignore unlock errors
  343. def _write_log_with_unix_lock(self, log_file_path: str, log_message: str):
  344. """
  345. Write log message to file using Unix/Linux fcntl file locking
  346. """
  347. try:
  348. import fcntl
  349. except ImportError:
  350. # fcntl not available, fallback to basic write
  351. self._write_log_without_lock(log_file_path, log_message)
  352. return
  353. with open(log_file_path, 'a', encoding='utf-8') as f:
  354. try:
  355. # Acquire exclusive lock on the file
  356. fcntl.flock(f.fileno(), fcntl.LOCK_EX)
  357. f.write(log_message)
  358. f.flush() # Ensure immediate write to disk
  359. finally:
  360. fcntl.flock(f.fileno(), fcntl.LOCK_UN)
  361. def _write_log_without_lock(self, log_file_path: str, log_message: str):
  362. """
  363. Write log message to file without file locking (fallback method)
  364. """
  365. try:
  366. with open(log_file_path, 'a', encoding='utf-8') as f:
  367. f.write(log_message)
  368. f.flush() # Ensure immediate write to disk
  369. except Exception as e:
  370. logger.warning(
  371. f"Failed to write to instance download log {log_file_path}: {e}"
  372. )
  373. def _write_to_instance_download_logs(
  374. self, message: str, is_error=False, use_tqdm_format=False
  375. ):
  376. """
  377. Write download log message to all associated model instance download log files
  378. Skip writing if download is completed to avoid unnecessary logs
  379. """
  380. if not self._instance_download_log_file:
  381. return
  382. if use_tqdm_format:
  383. # For tqdm-style progress with ANSI control sequences
  384. if message.startswith('\033[') or message.startswith('\r\033['):
  385. # This is an ANSI control message, write it directly without additional formatting
  386. log_message = message
  387. else:
  388. # Regular tqdm message without timestamp
  389. log_message = f"{message}\n"
  390. else:
  391. timestamp = time.strftime('%Y-%m-%d %H:%M:%S')
  392. log_level = "ERROR" if is_error else "INFO"
  393. log_message = f"[{timestamp}] [{log_level}] {message}\n"
  394. # Increment header lines counter for non-tqdm messages
  395. self._log_header_lines += 1
  396. # Determine file locking mechanism based on platform
  397. is_windows = platform.system() == 'Windows'
  398. # Ensure log directory exists
  399. Path(self._instance_download_log_file).parent.mkdir(parents=True, exist_ok=True)
  400. # Use appropriate locking method based on platform
  401. if is_windows:
  402. self._write_log_with_windows_lock(
  403. self._instance_download_log_file, log_message
  404. )
  405. else:
  406. self._write_log_with_unix_lock(
  407. self._instance_download_log_file, log_message
  408. )
  409. def run(self):
  410. try:
  411. self.prerun()
  412. self._write_to_instance_download_logs(
  413. f"Model file download task started: {self._model_file.readable_source}"
  414. )
  415. self._download_model_file()
  416. self._write_to_instance_download_logs(
  417. f"Model file download task completed successfully: {self._model_file.readable_source}"
  418. )
  419. except asyncio.CancelledError:
  420. self._write_to_instance_download_logs(
  421. f"Download task cancelled: {self._model_file.readable_source}"
  422. )
  423. except Timeout:
  424. lock_path = get_lock_path(self._config.cache_dir, self._model_file)
  425. info = read_lock_info(lock_path) if lock_path else None
  426. owner_id = info.get("worker_id") if info else None
  427. current_worker_id = self._model_file.worker_id
  428. if owner_id is None or owner_id != current_worker_id:
  429. logger.warning(
  430. f"Download model {self._model_file.readable_source} timed out: "
  431. f"lock held by other worker, please try again later."
  432. )
  433. return
  434. logger.warning(
  435. f"Download model {self._model_file.readable_source} timed out waiting to acquire the lock. "
  436. f"There might be another download task currently downloading the same model to the same disk directory."
  437. )
  438. except Exception as e:
  439. self._write_to_instance_download_logs(
  440. f"Download task failed: {self._model_file.readable_source} - {str(e)}",
  441. is_error=True,
  442. )
  443. self._update_model_file(
  444. self._model_file.id,
  445. state=ModelFileStateEnum.ERROR,
  446. state_message=str(e),
  447. )
  448. def _download_model_file(self):
  449. self._write_to_instance_download_logs(
  450. f"Downloading model file: {self._model_file.readable_source}"
  451. )
  452. model_paths = downloaders.download_model(
  453. self._model_file,
  454. local_dir=self._model_file.local_dir,
  455. cache_dir=self._config.cache_dir,
  456. huggingface_token=self._config.huggingface_token,
  457. )
  458. self._download_completed = True
  459. self._update_model_file(
  460. self._model_file.id,
  461. state=ModelFileStateEnum.READY,
  462. download_progress=100,
  463. resolved_paths=model_paths,
  464. )
  465. self._write_to_instance_download_logs(
  466. f"Successfully downloaded {self._model_file.readable_source}"
  467. )
  468. def hijack_tqdm_progress(task_self):
  469. """
  470. Monkey patch the tqdm progress bar to update the model instance download progress.
  471. tqdm is used by hf_hub_download under the hood.
  472. """
  473. from tqdm import tqdm
  474. _original_init = (
  475. tqdm._original_init if hasattr(tqdm, "_original_init") else tqdm.__init__
  476. )
  477. _original_update = (
  478. tqdm._original_update if hasattr(tqdm, "_original_update") else tqdm.update
  479. )
  480. def _new_init(self: tqdm, *args, **kwargs):
  481. task_self._handle_tqdm_init(self, _original_init, *args, **kwargs)
  482. def _new_update(self: tqdm, n=1):
  483. task_self._handle_tqdm_update(self, _original_update, n)
  484. tqdm.__init__ = _new_init
  485. tqdm.update = _new_update
  486. tqdm._original_init = _original_init
  487. tqdm._original_update = _original_update
  488. def _handle_tqdm_init(self, tqdm_instance, original_init, *args, **kwargs):
  489. kwargs["disable"] = False # enable the progress bar anyway
  490. original_init(tqdm_instance, *args, **kwargs)
  491. # Assign unique ID and line number for this tqdm instance
  492. tqdm_id = self._tqdm_counter
  493. self._tqdm_counter += 1
  494. tqdm_instance._gpustack_id = tqdm_id
  495. # Assign a fixed line number for this file (same as tqdm_id)
  496. line_number = tqdm_id
  497. self._file_line_mapping[tqdm_id] = line_number
  498. # Initialize progress tracking for this file
  499. self._file_progress_tracking[tqdm_id] = {
  500. 'last_update_time': 0,
  501. 'last_progress': 0.0,
  502. }
  503. if hasattr(self, '_model_file_size'):
  504. # Resume downloading
  505. self._model_downloaded_size += tqdm_instance.n
  506. # Write initial progress line for this file using ANSI cursor positioning
  507. file_desc = getattr(tqdm_instance, 'desc', None) or f"File {tqdm_id}"
  508. self._assign_file_basename(tqdm_id, file_desc)
  509. self._write_progress_with_cursor_positioning(
  510. line_number, f"{file_desc}: Initializing...", tqdm_id
  511. )
  512. def _handle_tqdm_update(self, tqdm_instance, original_update, n=1):
  513. # Get the tqdm ID and line number for this instance
  514. tqdm_id = getattr(tqdm_instance, '_gpustack_id', None)
  515. if not tqdm_id or tqdm_id not in self._file_line_mapping:
  516. return
  517. if self._resume_threshold and n > self._resume_threshold:
  518. # https://github.com/modelscope/modelscope/blob/609442d271bd7ed106a0933b1937289be7c1ad01/modelscope/hub/file_download.py#L417-L422
  519. # During download reconnection events, the progress bar may recalculate based on the current downloaded size.
  520. # We need to intercept this behavior and read the actual cached file size to correct the progress display.
  521. n = self._adjust_downloaded_by_cache_size(tqdm_instance, n)
  522. original_update(tqdm_instance, n)
  523. if self._cancel_flag.is_set():
  524. raise asyncio.CancelledError("Download cancelled")
  525. line_number = self._file_line_mapping[tqdm_id]
  526. with self._speed_lock:
  527. self._model_downloaded_size += n
  528. try:
  529. # Update overall progress
  530. progress = round(
  531. (self._model_downloaded_size / self._model_file_size) * 100, 2
  532. )
  533. # Update individual file progress using ANSI cursor positioning
  534. current_time = time.time()
  535. # Get file-specific progress tracking info
  536. file_tracking = self._file_progress_tracking.get(
  537. tqdm_id, {'last_update_time': 0, 'last_progress': 0.0}
  538. )
  539. # Calculate individual file progress percentage
  540. if tqdm_instance.total and tqdm_instance.total > 0:
  541. file_progress = (tqdm_instance.n / tqdm_instance.total) * 100
  542. else:
  543. file_progress = 0.0
  544. # Check if we should log based on time (2 seconds) or progress change (1%)
  545. time_elapsed = current_time - file_tracking['last_update_time']
  546. should_log = (
  547. time_elapsed >= self._log_update_interval # 2 seconds elapsed
  548. or file_progress >= 100.0 # Always log when complete
  549. or (
  550. tqdm_instance.total is not None
  551. and tqdm_instance.n >= tqdm_instance.total
  552. ) # Always log when download completes
  553. )
  554. if should_log:
  555. # Update progress to server
  556. self._update_progress_func(progress)
  557. # Format progress message using tqdm's string representation
  558. progress_str = str(tqdm_instance)
  559. self._write_progress_with_cursor_positioning(
  560. line_number, progress_str, tqdm_id
  561. )
  562. # Update file-specific tracking info
  563. self._file_progress_tracking[tqdm_id] = {
  564. 'last_update_time': current_time,
  565. 'last_progress': file_progress,
  566. }
  567. # Keep global update time for backward compatibility
  568. self._last_log_update_time = current_time
  569. if file_progress >= 100.0:
  570. self._recover_cursor_to_end()
  571. except Exception as e:
  572. error_msg = f"Failed to update model file: {e}"
  573. self._write_to_instance_download_logs(
  574. f"Download error: {error_msg}", is_error=True
  575. )
  576. raise Exception(error_msg)
  577. def _adjust_downloaded_by_cache_size(self, tqdm_instance, n: int) -> int:
  578. try:
  579. actual_size = self._get_cache_file_actual_size(tqdm_instance)
  580. if actual_size is None:
  581. return n
  582. base = tqdm_instance.n or 0
  583. delta = actual_size - base
  584. logger.debug(f"_adjust_downloaded_by_cache_size success, delta = {n}")
  585. return delta if delta > 0 else 0
  586. except Exception:
  587. return n
  588. def _get_cache_file_actual_size(self, tqdm_instance) -> int | None:
  589. try:
  590. source = self._model_file.source
  591. paths = self._model_file.resolved_paths or []
  592. if not paths:
  593. return None
  594. target_basename = None
  595. tid = getattr(tqdm_instance, '_gpustack_id', None)
  596. if tid is not None:
  597. target_basename = self._tqdm_file_basename.get(tid)
  598. if source == SourceEnum.MODEL_SCOPE:
  599. for path in paths:
  600. p = Path(str(path))
  601. if p.is_dir():
  602. size = self._get_incomplete_size_from_dir(
  603. p, target_basename, tqdm_instance
  604. )
  605. if size is not None:
  606. return size
  607. continue
  608. filename_pattern = p.name
  609. local_dir = p.parent
  610. if target_basename and target_basename != filename_pattern:
  611. continue
  612. incomplete_path = (
  613. local_dir / TEMPORARY_FOLDER_NAME / filename_pattern
  614. )
  615. if incomplete_path.exists():
  616. return get_local_file_size_in_byte(str(incomplete_path))
  617. return None
  618. except Exception:
  619. return None
  620. def _get_incomplete_size_from_dir(
  621. self, local_dir: Path, target_basename, tqdm_instance
  622. ) -> int | None:
  623. try:
  624. tb = target_basename
  625. if tb is None:
  626. desc = getattr(tqdm_instance, 'desc', None)
  627. temp_dir = local_dir / TEMPORARY_FOLDER_NAME
  628. if desc and temp_dir.exists():
  629. for f in temp_dir.iterdir():
  630. name = f.name
  631. if name and name in desc:
  632. tb = name
  633. break
  634. if tb:
  635. incomplete_path = local_dir / TEMPORARY_FOLDER_NAME / tb
  636. if incomplete_path.exists():
  637. return get_local_file_size_in_byte(str(incomplete_path))
  638. return None
  639. except Exception:
  640. return None
  641. def _assign_file_basename(self, tqdm_id: int, desc: str | None):
  642. try:
  643. if not desc:
  644. return
  645. paths = self._model_file.resolved_paths or []
  646. for path in paths:
  647. b = Path(str(path)).name
  648. if b and b in desc:
  649. self._tqdm_file_basename[tqdm_id] = b
  650. return
  651. except Exception:
  652. return
  653. def _write_progress_with_cursor_positioning(
  654. self, line_number: int, message: str, tqdm_id: int
  655. ):
  656. """Write progress message to a specific line using ANSI cursor positioning"""
  657. if not self._instance_download_log_file:
  658. return
  659. try:
  660. # Calculate the actual line position in the file
  661. actual_line = line_number + self._log_header_lines
  662. # Create ANSI escape sequence to position cursor at specific line, column 1
  663. cursor_position = f"\033[{actual_line};1H"
  664. # Clear the entire line to remove any residual characters
  665. clear_line = "\033[2K"
  666. # Add timestamp and tqdm_id prefix to the message
  667. timestamp = time.strftime('%H:%M:%S')
  668. formatted_message = (
  669. f"[{timestamp}] [{tqdm_id}]" if tqdm_id > 0 else f"[{timestamp}]"
  670. )
  671. formatted_message = f"{formatted_message} {message}"
  672. # Combine cursor positioning, line clearing, and new content
  673. ansi_message = f"{cursor_position}{clear_line}{formatted_message}\n"
  674. # Write to log file using the existing infrastructure
  675. self._write_to_instance_download_logs(ansi_message, use_tqdm_format=True)
  676. except Exception as e:
  677. logger.warning(
  678. f"Failed to write progress with cursor positioning to line {line_number}: {e}"
  679. )
  680. def _recover_cursor_to_end(self):
  681. """Recover cursor to end of log file"""
  682. max_line_number = (
  683. max(self._file_line_mapping.values()) if self._file_line_mapping else 0
  684. )
  685. line_num = max_line_number + self._log_header_lines + 1
  686. self._write_to_instance_download_logs(
  687. f"\033[{line_num};1H", use_tqdm_format=True # Move cursor to end of file
  688. )
  689. def _ensure_model_file_size_and_paths(self):
  690. if self._model_file.size is not None:
  691. return
  692. repo_file_list = downloaders.get_model_file_info(
  693. self._model_file,
  694. huggingface_token=self._config.huggingface_token,
  695. cache_dir=self._config.cache_dir,
  696. )
  697. (size, file_paths) = hub.match_file_and_calculate_size(
  698. files=repo_file_list,
  699. model=self._model_file,
  700. cache_dir=self._config.cache_dir,
  701. )
  702. self._update_model_file(
  703. self._model_file.id, size=size, resolved_paths=file_paths
  704. )
  705. self._model_file.size = size
  706. self._model_file.resolved_paths = file_paths
  707. def _update_model_file_progress(self, model_file_id: int, progress: float):
  708. self._update_model_file(model_file_id, download_progress=progress)
  709. def _update_model_file(self, id: int, **kwargs):
  710. model_file_public = self._clientset.model_files.get(id=id)
  711. model_file_update = ModelFileUpdate(**model_file_public.model_dump())
  712. for key, value in kwargs.items():
  713. setattr(model_file_update, key, value)
  714. self._clientset.model_files.update(id=id, model_update=model_file_update)