parallel_evaluation_and_training.py 6.3 KB

123456789101112131415161718192021222324252627282930313233343536373839404142434445464748495051525354555657585960616263646566676869707172737475767778798081828384858687888990919293949596979899100101102103104105106107108109110111112113114115116117118119120121122123124125126127128129130131132133134135136137138139140141142143144145146147148149150151152153154155156157158159160161162
  1. import argparse
  2. import os
  3. from ray.rllib.agents.callbacks import DefaultCallbacks
  4. from ray.rllib.utils.test_utils import check_learning_achieved
  5. parser = argparse.ArgumentParser()
  6. parser.add_argument(
  7. "--evaluation-duration",
  8. type=lambda v: v if v == "auto" else int(v),
  9. default=13,
  10. help="Number of evaluation episodes/timesteps to run each iteration. "
  11. "If 'auto', will run as many as possible during train pass.")
  12. parser.add_argument(
  13. "--evaluation-duration-unit",
  14. type=str,
  15. default="episodes",
  16. choices=["episodes", "timesteps"],
  17. help="The unit in which to measure the duration (`episodes` or"
  18. "`timesteps`).")
  19. parser.add_argument(
  20. "--evaluation-num-workers",
  21. type=int,
  22. default=2,
  23. help="The number of evaluation workers to setup. "
  24. "0 for a single local evaluation worker. Note that for values >0, no"
  25. "local evaluation worker will be created (b/c not needed).")
  26. parser.add_argument(
  27. "--evaluation-interval",
  28. type=int,
  29. default=2,
  30. help="Every how many train iterations should we run an evaluation loop?")
  31. parser.add_argument(
  32. "--run",
  33. type=str,
  34. default="PPO",
  35. help="The RLlib-registered algorithm to use.")
  36. parser.add_argument("--num-cpus", type=int, default=0)
  37. parser.add_argument(
  38. "--framework",
  39. choices=["tf", "tf2", "tfe", "torch"],
  40. default="tf",
  41. help="The DL framework specifier.")
  42. parser.add_argument(
  43. "--as-test",
  44. action="store_true",
  45. help="Whether this script should be run as a test: --stop-reward must "
  46. "be achieved within --stop-timesteps AND --stop-iters.")
  47. parser.add_argument(
  48. "--stop-iters",
  49. type=int,
  50. default=200,
  51. help="Number of iterations to train.")
  52. parser.add_argument(
  53. "--stop-timesteps",
  54. type=int,
  55. default=200000,
  56. help="Number of timesteps to train.")
  57. parser.add_argument(
  58. "--stop-reward",
  59. type=float,
  60. default=180.0,
  61. help="Reward at which we stop training.")
  62. parser.add_argument(
  63. "--local-mode",
  64. action="store_true",
  65. help="Init Ray in local mode for easier debugging.")
  66. class AssertEvalCallback(DefaultCallbacks):
  67. def on_train_result(self, *, trainer, result, **kwargs):
  68. # Make sure we always run exactly the given evaluation duration,
  69. # no matter what the other settings are (such as
  70. # `evaluation_num_workers` or `evaluation_parallel_to_training`).
  71. if "evaluation" in result and "hist_stats" in result["evaluation"]:
  72. hist_stats = result["evaluation"]["hist_stats"]
  73. # We count in episodes.
  74. if trainer.config["evaluation_duration_unit"] == "episodes":
  75. num_episodes_done = len(hist_stats["episode_lengths"])
  76. # Compare number of entries in episode_lengths (this is the
  77. # number of episodes actually run) with desired number of
  78. # episodes from the config.
  79. if isinstance(trainer.config["evaluation_duration"], int):
  80. assert num_episodes_done == \
  81. trainer.config["evaluation_duration"]
  82. # If auto-episodes: Expect at least as many episode as workers
  83. # (each worker's `sample()` is at least called once).
  84. else:
  85. assert trainer.config["evaluation_duration"] == "auto"
  86. assert num_episodes_done >= \
  87. trainer.config["evaluation_num_workers"]
  88. print("Number of run evaluation episodes: "
  89. f"{num_episodes_done} (ok)!")
  90. # We count in timesteps.
  91. else:
  92. num_timesteps_reported = result["evaluation"][
  93. "timesteps_this_iter"]
  94. num_timesteps_wanted = trainer.config["evaluation_duration"]
  95. if num_timesteps_wanted != "auto":
  96. delta = num_timesteps_wanted - num_timesteps_reported
  97. # Expect roughly the same (desired // num-eval-workers).
  98. assert abs(delta) < 20, \
  99. (delta, num_timesteps_wanted, num_timesteps_reported)
  100. print("Number of run evaluation timesteps: "
  101. f"{num_timesteps_reported} (ok)!")
  102. print(f"R={result['evaluation']['episode_reward_mean']}")
  103. if __name__ == "__main__":
  104. import ray
  105. from ray import tune
  106. args = parser.parse_args()
  107. ray.init(num_cpus=args.num_cpus or None, local_mode=args.local_mode)
  108. config = {
  109. "env": "CartPole-v0",
  110. # Use GPUs iff `RLLIB_NUM_GPUS` env var set to > 0.
  111. "num_gpus": int(os.environ.get("RLLIB_NUM_GPUS", "0")),
  112. "framework": args.framework,
  113. # Run with tracing enabled for tfe/tf2.
  114. "eager_tracing": args.framework in ["tfe", "tf2"],
  115. # Parallel evaluation+training config.
  116. # Switch on evaluation in parallel with training.
  117. "evaluation_parallel_to_training": True,
  118. # Use two evaluation workers. Must be >0, otherwise,
  119. # evaluation will run on a local worker and block (no parallelism).
  120. "evaluation_num_workers": args.evaluation_num_workers,
  121. # Evaluate every other training iteration (together
  122. # with every other call to Trainer.train()).
  123. "evaluation_interval": args.evaluation_interval,
  124. # Run for n episodes/timesteps (properly distribute load amongst
  125. # all eval workers). The longer it takes to evaluate, the more sense
  126. # it makes to use `evaluation_parallel_to_training=True`.
  127. # Use "auto" to run evaluation for roughly as long as the training
  128. # step takes.
  129. "evaluation_duration": args.evaluation_duration,
  130. # "episodes" or "timesteps".
  131. "evaluation_duration_unit": args.evaluation_duration_unit,
  132. # Use a custom callback that asserts that we are running the
  133. # configured exact number of episodes per evaluation OR - in auto
  134. # mode - run at least as many episodes as we have eval workers.
  135. "callbacks": AssertEvalCallback,
  136. }
  137. stop = {
  138. "training_iteration": args.stop_iters,
  139. "timesteps_total": args.stop_timesteps,
  140. "episode_reward_mean": args.stop_reward,
  141. }
  142. results = tune.run(args.run, config=config, stop=stop, verbose=2)
  143. if args.as_test:
  144. check_learning_achieved(results, args.stop_reward)
  145. ray.shutdown()