release_test_util.py 4.4 KB

123456789101112131415161718192021222324252627282930313233343536373839404142434445464748495051525354555657585960616263646566676869707172737475767778798081828384858687888990919293949596979899100101102103104105106107108109110111112113114115116117118119120121122123124125126127128129130131132133134135136137138139140141142143144145146147148149150151152153154155156157158159160161162163164165166167168169
  1. import glob
  2. import os
  3. import time
  4. import ray
  5. from xgboost_ray import (
  6. train,
  7. RayDMatrix,
  8. RayFileType,
  9. RayDeviceQuantileDMatrix,
  10. RayParams,
  11. )
  12. from xgboost_ray.session import get_actor_rank, put_queue
  13. from xgboost.callback import TrainingCallback
  14. from xgboost.rabit import get_world_size
  15. if "OMP_NUM_THREADS" in os.environ:
  16. del os.environ["OMP_NUM_THREADS"]
  17. @ray.remote
  18. class FailureState:
  19. def __init__(self):
  20. self._failed_ids = set()
  21. def set_failed(self, id):
  22. if id in self._failed_ids:
  23. return False
  24. self._failed_ids.add(id)
  25. return True
  26. def has_failed(self, id):
  27. return id in self._failed_ids
  28. class FailureInjection(TrainingCallback):
  29. def __init__(self, id, state, ranks, iteration):
  30. self._id = id
  31. self._state = state
  32. self._ranks = ranks or []
  33. self._iteration = iteration
  34. super(FailureInjection).__init__()
  35. def after_iteration(self, model, epoch, evals_log):
  36. if epoch == self._iteration:
  37. rank = get_actor_rank()
  38. if rank in self._ranks:
  39. if not ray.get(self._state.has_failed.remote(self._id)):
  40. success = ray.get(self._state.set_failed.remote(self._id))
  41. if not success:
  42. # Another rank is already about to fail
  43. return
  44. pid = os.getpid()
  45. print(f"Killing process: {pid} for actor rank {rank}")
  46. time.sleep(1)
  47. os.kill(pid, 9)
  48. class TrackingCallback(TrainingCallback):
  49. def before_iteration(self, model, epoch, evals_log):
  50. if get_actor_rank() == 3:
  51. print(f"[Rank {get_actor_rank()}] I am at iteration {epoch}")
  52. put_queue(get_world_size())
  53. def get_parquet_files(path, num_files=0):
  54. path = os.path.expanduser(path)
  55. if not os.path.exists(path):
  56. raise ValueError(f"Path does not exist: {path}")
  57. files = sorted(glob.glob(f"{path}/**/*.parquet"))
  58. while num_files > len(files):
  59. files = files + files
  60. return files[0:num_files]
  61. def train_ray(
  62. path,
  63. num_workers,
  64. num_boost_rounds,
  65. num_files=0,
  66. regression=False,
  67. use_gpu=False,
  68. ray_params=None,
  69. xgboost_params=None,
  70. **kwargs,
  71. ):
  72. if not isinstance(path, list):
  73. path = get_parquet_files(path, num_files=num_files)
  74. use_device_matrix = False
  75. if use_gpu:
  76. try:
  77. import cupy # noqa: F401
  78. use_device_matrix = True
  79. except ImportError:
  80. use_device_matrix = False
  81. if use_device_matrix:
  82. dtrain = RayDeviceQuantileDMatrix(
  83. path,
  84. num_actors=num_workers,
  85. label="labels",
  86. ignore=["partition"],
  87. filetype=RayFileType.PARQUET,
  88. )
  89. else:
  90. dtrain = RayDMatrix(
  91. path,
  92. num_actors=num_workers,
  93. label="labels",
  94. ignore=["partition"],
  95. filetype=RayFileType.PARQUET,
  96. )
  97. config = {"tree_method": "hist" if not use_gpu else "gpu_hist"}
  98. if not regression:
  99. # Classification
  100. config.update(
  101. {
  102. "objective": "binary:logistic",
  103. "eval_metric": ["logloss", "error"],
  104. }
  105. )
  106. else:
  107. # Regression
  108. config.update(
  109. {
  110. "objective": "reg:squarederror",
  111. "eval_metric": ["logloss", "rmse"],
  112. }
  113. )
  114. if xgboost_params:
  115. config.update(xgboost_params)
  116. start = time.time()
  117. evals_result = {}
  118. additional_results = {}
  119. bst = train(
  120. config,
  121. dtrain,
  122. evals_result=evals_result,
  123. additional_results=additional_results,
  124. num_boost_round=num_boost_rounds,
  125. ray_params=ray_params
  126. or RayParams(
  127. max_actor_restarts=2,
  128. num_actors=num_workers,
  129. cpus_per_actor=1,
  130. gpus_per_actor=1 if not use_gpu else 1,
  131. ),
  132. evals=[(dtrain, "train")],
  133. **kwargs,
  134. )
  135. taken = time.time() - start
  136. print(f"TRAIN TIME TAKEN: {taken:.2f} seconds")
  137. out_file = os.path.expanduser(
  138. "~/benchmark_{}.xgb".format("cpu" if not use_gpu else "gpu")
  139. )
  140. bst.save_model(out_file)
  141. print("Final training error: {:.4f}".format(evals_result["train"]["error"][-1]))
  142. return bst, additional_results, taken