metric_utils.py 3.9 KB

123456789101112131415161718192021222324252627282930313233343536373839404142434445464748495051525354555657585960616263646566676869707172737475767778798081828384858687888990919293949596979899100101102103104105106107108109110111112113114115116117118119120121122123124125126127128129130131132133134135136
  1. import threading
  2. import time
  3. def get_ray_spilled_and_restored_mb():
  4. import ray._private.internal_api as internal_api
  5. import re
  6. summary_str = internal_api.memory_summary(stats_only=True)
  7. match = re.search("Spilled (\d+) MiB", summary_str)
  8. spilled_mb = int(match.group(1)) if match else 0
  9. match = re.search("Restored (\d+) MiB", summary_str)
  10. restored_mb = int(match.group(1)) if match else 0
  11. return spilled_mb, restored_mb
  12. class MaxMemoryUtilizationTracker:
  13. """
  14. Class that enables tracking of the maximum memory utilization on a
  15. system.
  16. This creates a thread which samples the available memory every sample_interval_s
  17. seconds. The "available" memory is reported directly from psutil.
  18. See https://psutil.readthedocs.io/en/latest/#psutil.virtual_memory for more
  19. information.
  20. """
  21. def __init__(self, sample_interval_s: float):
  22. self._results = {}
  23. self._stop_event = threading.Event()
  24. self._print_updates = False
  25. self._thread = threading.Thread(
  26. target=self._track_memory_utilization,
  27. args=(
  28. sample_interval_s,
  29. self._print_updates,
  30. self._results,
  31. self._stop_event,
  32. ),
  33. )
  34. @staticmethod
  35. def _track_memory_utilization(
  36. sample_interval_s: float,
  37. print_updates: bool,
  38. output_dict: dict,
  39. stop_event: threading.Event,
  40. ):
  41. import psutil
  42. min_available = float("inf")
  43. while not stop_event.is_set():
  44. memory_stats = psutil.virtual_memory()
  45. if memory_stats.available < min_available:
  46. if print_updates:
  47. print(
  48. "{before:.02f} -> {after:.02f}".format(
  49. before=min_available / (1 << 30),
  50. after=memory_stats.available / (1 << 30),
  51. )
  52. )
  53. min_available = memory_stats.available
  54. time.sleep(sample_interval_s)
  55. output_dict["min_available_bytes"] = min_available
  56. def start(self) -> None:
  57. assert (
  58. not self._stop_event.is_set()
  59. ), "Can't start a thread that has been stopped."
  60. self._thread.start()
  61. def stop(self) -> int:
  62. assert (
  63. not self._stop_event.is_set()
  64. ), "Can't stop a thread that has been stopped."
  65. self._stop_event.set()
  66. self._thread.join()
  67. return self._results["min_available_bytes"]
  68. def determine_if_memory_monitor_is_enabled_in_latest_session():
  69. """
  70. Grep session_latest raylet logs to see if the memory monitor is enabled.
  71. This is really only helpful when you're interested in session_latest, use with care.
  72. """
  73. import subprocess
  74. completed_proc = subprocess.run(
  75. [
  76. "grep",
  77. "-q",
  78. "MemoryMonitor initialized",
  79. "/tmp/ray/session_latest/logs/raylet.out",
  80. ],
  81. stdout=subprocess.PIPE,
  82. stderr=subprocess.PIPE,
  83. )
  84. assert completed_proc.returncode in [
  85. 0,
  86. 1,
  87. ], f"Unexpected returncode {completed_proc.returncode}"
  88. assert not completed_proc.stdout, f"Unexpected stdout {completed_proc.stdout}"
  89. assert not completed_proc.stderr, f"Unexpected stderr {completed_proc.stderr}"
  90. return completed_proc.returncode == 0
  91. def test_max_mem_util_tracker():
  92. max_mem_tracker = MaxMemoryUtilizationTracker(sample_interval_s=1)
  93. max_mem_tracker.start()
  94. import numpy as np
  95. time.sleep(4)
  96. print("create numpy")
  97. large_tensor = np.random.randint(10, size=1 << 30, dtype=np.uint8)
  98. large_tensor += 1
  99. print("done create numpy")
  100. time.sleep(2)
  101. results = max_mem_tracker.stop()
  102. min_available_gb = results["min_available_bytes"] / (1 << 30)
  103. print(f"{min_available_gb:.02f}")
  104. if __name__ == "__main__":
  105. test_max_mem_util_tracker()