123456789101112131415161718192021222324252627282930313233343536373839404142434445464748495051525354555657585960616263646566676869707172737475767778798081828384858687888990919293949596979899100101102103104105106107108109110111112113114115116117118119120121122123124125126127128129130131132133134135136137138139140141142143144145146147148149150151152153154155156157158159160161162163164165166167168169170171172173174175176177178179180181182183184185186187188189190191192193194195196197198199200201202203204205206207208209210211212213214215216217218219220221222223224225226227228229230231232233234235236237238239240241242243244245246247248249250251252253254255256257258259260261262263264265266 |
- import os
- from pathlib import Path
- import re
- import sys
- import unittest
- import ray
- from ray import tune
- from ray.rllib.examples.env.multi_agent import MultiAgentCartPole
- from ray.rllib.utils.test_utils import framework_iterator
- def evaluate_test(algo, env="CartPole-v0", test_episode_rollout=False):
- extra_config = ""
- if algo == "ARS":
- extra_config = ",\"train_batch_size\": 10, \"noise_size\": 250000"
- elif algo == "ES":
- extra_config = ",\"episodes_per_batch\": 1,\"train_batch_size\": 10, "\
- "\"noise_size\": 250000"
- for fw in framework_iterator(frameworks=("tf", "torch")):
- fw_ = ", \"framework\": \"{}\"".format(fw)
- tmp_dir = os.popen("mktemp -d").read()[:-1]
- if not os.path.exists(tmp_dir):
- sys.exit(1)
- print("Saving results to {}".format(tmp_dir))
- rllib_dir = str(Path(__file__).parent.parent.absolute())
- print("RLlib dir = {}\nexists={}".format(rllib_dir,
- os.path.exists(rllib_dir)))
- os.system(
- "python {}/train.py --local-dir={} --run={} "
- "--checkpoint-freq=1 ".format(rllib_dir, tmp_dir, algo) +
- "--config='{" + "\"num_workers\": 1, \"num_gpus\": 0{}{}".format(
- fw_, extra_config) +
- ", \"timesteps_per_iteration\": 5,\"min_time_s_per_reporting\": 0.1, "
- "\"model\": {\"fcnet_hiddens\": [10]}"
- "}' --stop='{\"training_iteration\": 1}'" +
- " --env={} --no-ray-ui".format(env))
- checkpoint_path = os.popen("ls {}/default/*/checkpoint_000001/"
- "checkpoint-1".format(tmp_dir)).read()[:-1]
- if not os.path.exists(checkpoint_path):
- sys.exit(1)
- print("Checkpoint path {} (exists)".format(checkpoint_path))
- # Test rolling out n steps.
- os.popen("python {}/evaluate.py --run={} \"{}\" --steps=10 "
- "--out=\"{}/rollouts_10steps.pkl\" --no-render".format(
- rllib_dir, algo, checkpoint_path, tmp_dir)).read()
- if not os.path.exists(tmp_dir + "/rollouts_10steps.pkl"):
- sys.exit(1)
- print("evaluate output (10 steps) exists!")
- # Test rolling out 1 episode.
- if test_episode_rollout:
- os.popen("python {}/evaluate.py --run={} \"{}\" --episodes=1 "
- "--out=\"{}/rollouts_1episode.pkl\" --no-render".format(
- rllib_dir, algo, checkpoint_path, tmp_dir)).read()
- if not os.path.exists(tmp_dir + "/rollouts_1episode.pkl"):
- sys.exit(1)
- print("evaluate output (1 ep) exists!")
- # Cleanup.
- os.popen("rm -rf \"{}\"".format(tmp_dir)).read()
- def learn_test_plus_evaluate(algo, env="CartPole-v0"):
- for fw in framework_iterator(frameworks=("tf", "torch")):
- fw_ = ", \\\"framework\\\": \\\"{}\\\"".format(fw)
- tmp_dir = os.popen("mktemp -d").read()[:-1]
- if not os.path.exists(tmp_dir):
- # Last resort: Resolve via underlying tempdir (and cut tmp_.
- tmp_dir = ray._private.utils.tempfile.gettempdir() + tmp_dir[4:]
- if not os.path.exists(tmp_dir):
- sys.exit(1)
- print("Saving results to {}".format(tmp_dir))
- rllib_dir = str(Path(__file__).parent.parent.absolute())
- print("RLlib dir = {}\nexists={}".format(rllib_dir,
- os.path.exists(rllib_dir)))
- os.system("python {}/train.py --local-dir={} --run={} "
- "--checkpoint-freq=1 --checkpoint-at-end ".format(
- rllib_dir, tmp_dir, algo) +
- "--config=\"{\\\"num_gpus\\\": 0, \\\"num_workers\\\": 1, "
- "\\\"evaluation_config\\\": {\\\"explore\\\": false}" + fw_ +
- "}\" " + "--stop=\"{\\\"episode_reward_mean\\\": 100.0}\"" +
- " --env={}".format(env))
- # Find last checkpoint and use that for the rollout.
- checkpoint_path = os.popen("ls {}/default/*/checkpoint_*/"
- "checkpoint-*".format(tmp_dir)).read()[:-1]
- checkpoints = [
- cp for cp in checkpoint_path.split("\n")
- if re.match(r"^.+checkpoint-\d+$", cp)
- ]
- # Sort by number and pick last (which should be the best checkpoint).
- last_checkpoint = sorted(
- checkpoints,
- key=lambda x: int(re.match(r".+checkpoint-(\d+)", x).group(1)))[-1]
- assert re.match(r"^.+checkpoint_\d+/checkpoint-\d+$", last_checkpoint)
- if not os.path.exists(last_checkpoint):
- sys.exit(1)
- print("Best checkpoint={} (exists)".format(last_checkpoint))
- # Test rolling out n steps.
- result = os.popen(
- "python {}/evaluate.py --run={} "
- "--steps=400 "
- "--out=\"{}/rollouts_n_steps.pkl\" --no-render \"{}\"".format(
- rllib_dir, algo, tmp_dir, last_checkpoint)).read()[:-1]
- if not os.path.exists(tmp_dir + "/rollouts_n_steps.pkl"):
- sys.exit(1)
- print("Rollout output exists -> Checking reward ...")
- episodes = result.split("\n")
- mean_reward = 0.0
- num_episodes = 0
- for ep in episodes:
- mo = re.match(r"Episode .+reward: ([\d\.\-]+)", ep)
- if mo:
- mean_reward += float(mo.group(1))
- num_episodes += 1
- mean_reward /= num_episodes
- print("Rollout's mean episode reward={}".format(mean_reward))
- assert mean_reward >= 100.0
- # Cleanup.
- os.popen("rm -rf \"{}\"".format(tmp_dir)).read()
- def learn_test_multi_agent_plus_evaluate(algo):
- for fw in framework_iterator(frameworks=("tf", "torch")):
- tmp_dir = os.popen("mktemp -d").read()[:-1]
- if not os.path.exists(tmp_dir):
- # Last resort: Resolve via underlying tempdir (and cut tmp_.
- tmp_dir = ray._private.utils.tempfile.gettempdir() + tmp_dir[4:]
- if not os.path.exists(tmp_dir):
- sys.exit(1)
- print("Saving results to {}".format(tmp_dir))
- rllib_dir = str(Path(__file__).parent.parent.absolute())
- print("RLlib dir = {}\nexists={}".format(rllib_dir,
- os.path.exists(rllib_dir)))
- def policy_fn(agent_id, episode, **kwargs):
- return "pol{}".format(agent_id)
- config = {
- "num_gpus": 0,
- "num_workers": 1,
- "evaluation_config": {
- "explore": False
- },
- "framework": fw,
- "env": MultiAgentCartPole,
- "multiagent": {
- "policies": {"pol0", "pol1"},
- "policy_mapping_fn": policy_fn,
- },
- }
- stop = {"episode_reward_mean": 100.0}
- tune.run(
- algo,
- config=config,
- stop=stop,
- checkpoint_freq=1,
- checkpoint_at_end=True,
- local_dir=tmp_dir,
- verbose=1)
- # Find last checkpoint and use that for the rollout.
- checkpoint_path = os.popen("ls {}/PPO/*/checkpoint_*/"
- "checkpoint-*".format(tmp_dir)).read()[:-1]
- checkpoint_paths = checkpoint_path.split("\n")
- assert len(checkpoint_paths) > 0
- checkpoints = [
- cp for cp in checkpoint_paths
- if re.match(r"^.+checkpoint-\d+$", cp)
- ]
- # Sort by number and pick last (which should be the best checkpoint).
- last_checkpoint = sorted(
- checkpoints,
- key=lambda x: int(re.match(r".+checkpoint-(\d+)", x).group(1)))[-1]
- assert re.match(r"^.+checkpoint_\d+/checkpoint-\d+$", last_checkpoint)
- if not os.path.exists(last_checkpoint):
- sys.exit(1)
- print("Best checkpoint={} (exists)".format(last_checkpoint))
- ray.shutdown()
- # Test rolling out n steps.
- result = os.popen(
- "python {}/evaluate.py --run={} "
- "--steps=400 "
- "--out=\"{}/rollouts_n_steps.pkl\" --no-render \"{}\"".format(
- rllib_dir, algo, tmp_dir, last_checkpoint)).read()[:-1]
- if not os.path.exists(tmp_dir + "/rollouts_n_steps.pkl"):
- sys.exit(1)
- print("Rollout output exists -> Checking reward ...")
- episodes = result.split("\n")
- mean_reward = 0.0
- num_episodes = 0
- for ep in episodes:
- mo = re.match(r"Episode .+reward: ([\d\.\-]+)", ep)
- if mo:
- mean_reward += float(mo.group(1))
- num_episodes += 1
- mean_reward /= num_episodes
- print("Rollout's mean episode reward={}".format(mean_reward))
- assert mean_reward >= 100.0
- # Cleanup.
- os.popen("rm -rf \"{}\"".format(tmp_dir)).read()
- class TestEvaluate1(unittest.TestCase):
- def test_a3c(self):
- evaluate_test("A3C")
- def test_ddpg(self):
- evaluate_test("DDPG", env="Pendulum-v1")
- class TestEvaluate2(unittest.TestCase):
- def test_dqn(self):
- evaluate_test("DQN")
- def test_es(self):
- evaluate_test("ES")
- class TestEvaluate3(unittest.TestCase):
- def test_impala(self):
- evaluate_test("IMPALA", env="CartPole-v0")
- def test_ppo(self):
- evaluate_test("PPO", env="CartPole-v0", test_episode_rollout=True)
- class TestEvaluate4(unittest.TestCase):
- def test_sac(self):
- evaluate_test("SAC", env="Pendulum-v1")
- class TestTrainAndEvaluate(unittest.TestCase):
- def test_ppo_train_then_rollout(self):
- learn_test_plus_evaluate("PPO")
- def test_ppo_multi_agent_train_then_rollout(self):
- learn_test_multi_agent_plus_evaluate("PPO")
- if __name__ == "__main__":
- import pytest
- # One can specify the specific TestCase class to run.
- # None for all unittest.TestCase classes in this file.
- class_ = sys.argv[1] if len(sys.argv) > 1 else None
- sys.exit(
- pytest.main(
- ["-v", __file__ + ("" if class_ is None else "::" + class_)]))
|