release_test_util.py 4.2 KB

123456789101112131415161718192021222324252627282930313233343536373839404142434445464748495051525354555657585960616263646566676869707172737475767778798081828384858687888990919293949596979899100101102103104105106107108109110111112113114115116117118119120121122123124125126127128129130131132133134135136137138139140141142143144145146147148149150151152153154155156157158159160161162163164165166167168169170
  1. import glob
  2. import os
  3. import time
  4. import ray
  5. from lightgbm_ray import (
  6. train,
  7. RayDMatrix,
  8. RayFileType,
  9. RayParams,
  10. RayDeviceQuantileDMatrix,
  11. )
  12. from lightgbm_ray.tune import _TuneLGBMRank0Mixin
  13. from lightgbm.callback import CallbackEnv
  14. if "OMP_NUM_THREADS" in os.environ:
  15. del os.environ["OMP_NUM_THREADS"]
  16. @ray.remote
  17. class FailureState:
  18. def __init__(self):
  19. self._failed_ids = set()
  20. def set_failed(self, id):
  21. if id in self._failed_ids:
  22. return False
  23. self._failed_ids.add(id)
  24. return True
  25. def has_failed(self, id):
  26. return id in self._failed_ids
  27. class FailureInjection(_TuneLGBMRank0Mixin):
  28. def __init__(self, id, state, ranks, iteration):
  29. self._id = id
  30. self._state = state
  31. self._ranks = ranks or []
  32. self._iteration = iteration
  33. def __call__(self, env: CallbackEnv):
  34. if env.iteration == self._iteration:
  35. rank = 0 if self.is_rank_0 else 1
  36. if rank in self._ranks:
  37. if not ray.get(self._state.has_failed.remote(self._id)):
  38. success = ray.get(self._state.set_failed.remote(self._id))
  39. if not success:
  40. # Another rank is already about to fail
  41. return
  42. pid = os.getpid()
  43. print(f"Killing process: {pid} for actor rank {rank}")
  44. time.sleep(1)
  45. os.kill(pid, 9)
  46. order = 2
  47. class TrackingCallback(_TuneLGBMRank0Mixin):
  48. def __call__(self, env: CallbackEnv):
  49. if self.is_rank_0:
  50. print(f"[Rank 0] I am at iteration {env.iteration}")
  51. order = 1
  52. def train_ray(
  53. path,
  54. num_workers,
  55. num_boost_rounds,
  56. num_files=0,
  57. regression=False,
  58. use_gpu=False,
  59. ray_params=None,
  60. lightgbm_params=None,
  61. **kwargs,
  62. ):
  63. path = os.path.expanduser(path)
  64. if not os.path.exists(path):
  65. raise ValueError(f"Path does not exist: {path}")
  66. if num_files:
  67. files = sorted(glob.glob(f"{path}/**/*.parquet"))
  68. while num_files > len(files):
  69. files = files + files
  70. path = files[0:num_files]
  71. use_device_matrix = False
  72. if use_gpu:
  73. try:
  74. import cupy # noqa: F401
  75. use_device_matrix = True
  76. except ImportError:
  77. use_device_matrix = False
  78. if use_device_matrix:
  79. dtrain = RayDeviceQuantileDMatrix(
  80. path,
  81. num_actors=num_workers,
  82. label="labels",
  83. ignore=["partition"],
  84. filetype=RayFileType.PARQUET,
  85. )
  86. else:
  87. dtrain = RayDMatrix(
  88. path,
  89. num_actors=num_workers,
  90. label="labels",
  91. ignore=["partition"],
  92. filetype=RayFileType.PARQUET,
  93. )
  94. config = {"device": "cpu" if not use_gpu else "gpu"}
  95. if not regression:
  96. # Classification
  97. config.update(
  98. {
  99. "objective": "binary",
  100. "metric": ["binary_logloss", "binary_error"],
  101. }
  102. )
  103. else:
  104. # Regression
  105. config.update(
  106. {
  107. "objective": "regression",
  108. "metric": ["l2", "rmse"],
  109. }
  110. )
  111. if lightgbm_params:
  112. config.update(lightgbm_params)
  113. start = time.time()
  114. evals_result = {}
  115. additional_results = {}
  116. bst = train(
  117. config,
  118. dtrain,
  119. evals_result=evals_result,
  120. additional_results=additional_results,
  121. num_boost_round=num_boost_rounds,
  122. ray_params=ray_params
  123. or RayParams(
  124. max_actor_restarts=2,
  125. num_actors=num_workers,
  126. cpus_per_actor=2,
  127. gpus_per_actor=0 if not use_gpu else 1,
  128. ),
  129. evals=[(dtrain, "train")],
  130. **kwargs,
  131. )
  132. taken = time.time() - start
  133. print(f"TRAIN TIME TAKEN: {taken:.2f} seconds")
  134. out_file = os.path.expanduser(
  135. "~/benchmark_{}.lgbm".format("cpu" if not use_gpu else "gpu")
  136. )
  137. bst.booster_.save_model(out_file)
  138. print(
  139. "Final training error: {:.4f}".format(
  140. evals_result["train"]["binary_error" if not regression else "rmse"][-1]
  141. )
  142. )
  143. return bst, additional_results, taken