file.py 4.4 KB

123456789101112131415161718192021222324252627282930313233343536373839404142434445464748495051525354555657585960616263646566676869707172737475767778798081828384858687888990919293949596979899100101102103104105106107108109110111112113114115116117118119120121122123124125126127128129130131132133134135136137138139140141142143144145146147148149
  1. import glob
  2. import os
  3. import re
  4. import shutil
  5. import asyncio
  6. from pathlib import Path
  7. from typing import Callable
  8. from tenacity import retry, stop_after_attempt, wait_fixed
  9. from gpustack.utils import platform
  10. def get_local_file_size_in_byte(file_path):
  11. if os.path.islink(file_path):
  12. file_path = os.path.realpath(file_path)
  13. size = os.path.getsize(file_path)
  14. return size
  15. def copy_with_owner(src, dst):
  16. shutil.copytree(src, dst, dirs_exist_ok=True)
  17. copy_owner_recursively(src, dst)
  18. def copy_owner_recursively(src, dst):
  19. if platform.system() in ["linux", "darwin"]:
  20. st = os.stat(src)
  21. os.chown(dst, st.st_uid, st.st_gid)
  22. for dirpath, dirnames, filenames in os.walk(dst):
  23. for dirname in dirnames:
  24. os.chown(os.path.join(dirpath, dirname), st.st_uid, st.st_gid)
  25. for filename in filenames:
  26. os.chown(os.path.join(dirpath, filename), st.st_uid, st.st_gid)
  27. @retry(stop=stop_after_attempt(10), wait=wait_fixed(1))
  28. def check_file_with_retries(path: Path):
  29. if not os.path.exists(path):
  30. raise FileNotFoundError(f"Log file not found: {path}")
  31. async def check_with_retries(checker: Callable, timeout: int = 30, interval: int = 1):
  32. """Generic async retry wrapper for checking operations.
  33. Args:
  34. checker: A callable (sync or async) that performs the check and returns a result.
  35. Should raise an exception if the check fails (triggers retry).
  36. timeout: Maximum time to wait in seconds (default: 30)
  37. interval: Time between retries in seconds (default: 1)
  38. Returns:
  39. The result from the checker function
  40. Raises:
  41. Exception: Whatever exception the checker raises on final failure
  42. Example:
  43. def check_files():
  44. files = get_files()
  45. if not files:
  46. raise FileNotFoundError("No files found")
  47. return files
  48. files = await check_with_retries(check_files, timeout=60, interval=1)
  49. """
  50. elapsed = 0
  51. while elapsed < timeout:
  52. try:
  53. if asyncio.iscoroutinefunction(checker):
  54. return await checker()
  55. else:
  56. return checker()
  57. except Exception:
  58. elapsed += interval
  59. if elapsed >= timeout:
  60. raise
  61. await asyncio.sleep(interval)
  62. def delete_path(path: str):
  63. """
  64. Delete a file or directory. If the path is a symbolic link, it will delete the target path.
  65. """
  66. if not os.path.lexists(path):
  67. return
  68. if os.path.islink(path):
  69. target_path = os.path.realpath(path)
  70. os.unlink(path)
  71. if os.path.lexists(target_path):
  72. delete_path(target_path)
  73. elif os.path.isfile(path):
  74. os.remove(path)
  75. elif os.path.isdir(path):
  76. for item in os.scandir(path):
  77. delete_path(item.path)
  78. shutil.rmtree(path)
  79. def getsize(path: str) -> int:
  80. """
  81. Get the total size of the path in bytes. Handles symbolic links and directories.
  82. """
  83. # Cache the size of directories to avoid redundant calculations.
  84. dir_size_cache = {}
  85. # Keep track of visited directories to avoid infinite loops.
  86. visited_dirs = set()
  87. return _getsize(path, visited_dirs, dir_size_cache)
  88. def _getsize(path: str, visited: set, cache: dict) -> int:
  89. real_path = os.path.realpath(path)
  90. if os.path.islink(path):
  91. return _getsize(real_path, visited, cache)
  92. elif os.path.isfile(real_path):
  93. return os.path.getsize(real_path)
  94. elif os.path.isdir(real_path):
  95. if real_path in visited:
  96. return 0
  97. visited.add(real_path)
  98. if real_path in cache:
  99. return cache[real_path]
  100. total = 0
  101. with os.scandir(real_path) as entries:
  102. for entry in entries:
  103. try:
  104. total += _getsize(entry.path, visited, cache)
  105. except FileNotFoundError:
  106. pass
  107. cache[real_path] = total
  108. return total
  109. raise FileNotFoundError(f"Path does not exist: {path}")
  110. def get_sharded_file_paths(file_path: str) -> str:
  111. dir_name, base_name = os.path.split(file_path)
  112. match = re.match(r"(.*?)-\d{5}-of-\d{5}\.(\w+)", base_name)
  113. if not match:
  114. return [file_path]
  115. prefix = match.group(1)
  116. extension = match.group(2)
  117. pattern = os.path.join(dir_name, f"{prefix}-*-of-*.{extension}")
  118. return sorted(glob.glob(pattern))