mem_check.py 2.9 KB

1234567891011121314151617181920212223242526272829303132333435363738394041424344454647484950515253545556575859606162636465666768697071727374757677787980818283848586878889909192
  1. import argparse
  2. import time
  3. import os
  4. import json
  5. import ray
  6. from ray._private.memory_monitor import MemoryMonitor, get_top_n_memory_usage
  7. from ray._private.test_utils import raw_metrics
  8. from ray.job_submission import JobSubmissionClient, JobStatus
  9. # Initialize ray to avoid autosuspend.
  10. addr = ray.init()
  11. if __name__ == "__main__":
  12. parser = argparse.ArgumentParser()
  13. parser.add_argument(
  14. "--working-dir",
  15. required=True,
  16. help="working_dir to use for the job within this test.",
  17. )
  18. args = parser.parse_args()
  19. client = JobSubmissionClient("http://127.0.0.1:8265")
  20. job_id = client.submit_job(
  21. # Entrypoint shell command to execute
  22. entrypoint="python workload.py",
  23. runtime_env={"working_dir": args.working_dir},
  24. )
  25. print(job_id)
  26. # If using a remote cluster, replace 127.0.0.1 with the head node's IP address.
  27. client = JobSubmissionClient("http://127.0.0.1:8265")
  28. m = MemoryMonitor()
  29. start = time.time()
  30. # Run for 3 hours
  31. initial_used_gb = m.get_memory_usage()[0]
  32. terminal_states = {JobStatus.SUCCEEDED, JobStatus.STOPPED, JobStatus.FAILED}
  33. while time.time() - start < 3600 * 3:
  34. print(f"{round((time.time() - start) / 60, 2)}m passed...")
  35. m.raise_if_low_memory()
  36. used_gb = m.get_memory_usage()[0]
  37. print("Used GB: ", used_gb)
  38. print(get_top_n_memory_usage())
  39. print("\n\n")
  40. # Terminate the test if the job is failed.
  41. status = client.get_job_status(job_id)
  42. print(f"Job status: {status}")
  43. if status in terminal_states:
  44. break
  45. time.sleep(15)
  46. ending_used_gb = m.get_memory_usage()[0]
  47. mem_growth = ending_used_gb - initial_used_gb
  48. top_n_mem_usage = get_top_n_memory_usage()
  49. print(top_n_mem_usage)
  50. print(f"Memory growth: {mem_growth} GB")
  51. if status == JobStatus.FAILED or status == JobStatus.STOPPED:
  52. print(client.get_job_logs(job_id))
  53. assert False, "Job has failed."
  54. me = raw_metrics(addr)
  55. found = False
  56. for metric, samples in me.items():
  57. if metric == "ray_component_uss_mb":
  58. for sample in samples:
  59. if sample.labels["Component"] == "agent":
  60. print(f"Metrics found memory usage : {sample.value} MB")
  61. found = True
  62. # Make sure it doesn't use more than 500MB of data.
  63. assert sample.value < 500
  64. assert found, "Agent memory metrics are not found."
  65. with open(os.environ["TEST_OUTPUT_JSON"], "w") as f:
  66. results = {
  67. "memory_growth_gb": mem_growth,
  68. "success": 1,
  69. }
  70. results["perf_metrics"] = [
  71. {
  72. "perf_metric_name": "memory_growth_gb",
  73. "perf_metric_value": mem_growth,
  74. "perf_metric_type": "LATENCY",
  75. }
  76. ]
  77. f.write(json.dumps(results))