task.py 2.3 KB

123456789101112131415161718192021222324252627282930313233343536373839404142434445464748495051525354555657585960616263646566676869707172737475767778798081828384858687
  1. import asyncio
  2. from functools import partial
  3. import logging
  4. import threading
  5. import time
  6. from typing import Callable, Optional
  7. from gpustack.utils.process import threading_stop_event
  8. logger = logging.getLogger(__name__)
  9. def run_periodically(
  10. func: Callable[[], None],
  11. interval: float = 5,
  12. initial_delay: float = 0,
  13. stop_event: Optional[threading.Event] = None,
  14. *args,
  15. **kwargs,
  16. ) -> None:
  17. """
  18. Repeatedly run a function with a given interval.
  19. Args:
  20. func: The function to be executed.
  21. interval: The interval in seconds.
  22. initial_delay: The initial delay in seconds.
  23. stop_event: The event to stop the function.
  24. *args: Positional arguments to pass to the function.
  25. **kwargs: Keyword arguments to pass to the function.
  26. """
  27. if stop_event is None:
  28. stop_event = threading.Event()
  29. if initial_delay > 0:
  30. time.sleep(initial_delay)
  31. while not stop_event.is_set():
  32. try:
  33. func(*args, **kwargs)
  34. except Exception as e:
  35. logger.error(f"Error running {func.__name__}: {e}")
  36. if stop_event.is_set():
  37. break
  38. time.sleep(interval)
  39. def run_periodically_in_thread(
  40. func: Callable,
  41. interval: float,
  42. initial_delay: float = 0,
  43. stop_event: Optional[threading.Event] = threading_stop_event,
  44. *args,
  45. **kwargs,
  46. ) -> threading.Thread:
  47. """
  48. Repeatedly run a function asynchronously with a given interval.
  49. Args:
  50. func: The function to be executed.
  51. interval: The interval time in seconds.
  52. initial_delay: The initial delay in seconds.
  53. stop_event: Optional; The event used to stop the periodic execution.
  54. *args: Positional arguments to pass to the function.
  55. **kwargs: Keyword arguments to pass to the function.
  56. Returns:
  57. The thread running the periodic function.
  58. """
  59. thread = threading.Thread(
  60. target=run_periodically,
  61. args=(func, interval, initial_delay, stop_event) + args,
  62. kwargs=kwargs,
  63. daemon=True,
  64. )
  65. thread.start()
  66. return thread
  67. async def run_in_thread(sync_func, timeout: Optional[float] = None, *args, **kwargs):
  68. task = asyncio.to_thread(partial(sync_func, *args, **kwargs))
  69. if timeout is None:
  70. return await task
  71. return await asyncio.wait_for(task, timeout=timeout)