123456789101112131415161718192021222324252627282930313233343536373839404142434445464748495051525354555657585960616263646566676869707172737475767778798081828384858687888990919293949596979899100101102103104105106107108109110111112113114115116117118119120121122123124125126127128129130131132133134135136137138139140141142143144145146147148149150151152153154155156157158159160161162163164165166167168169170171172173174175176177178179180181182183184185186187188189190191192193194195196197198199200201202203204205206207208209210211212 |
- """
- Example of a custom gym environment and model. Run this for a demo.
- This example shows:
- - using a custom environment
- - using a custom model
- - using Tune for grid search to try different learning rates
- You can visualize experiment results in ~/ray_results using TensorBoard.
- Run example with defaults:
- $ python custom_env.py
- For CLI options:
- $ python custom_env.py --help
- """
- import argparse
- import gym
- from gym.spaces import Discrete, Box
- import numpy as np
- import os
- import random
- import ray
- from ray import tune
- from ray.rllib.agents import ppo
- from ray.rllib.env.env_context import EnvContext
- from ray.rllib.models import ModelCatalog
- from ray.rllib.models.tf.tf_modelv2 import TFModelV2
- from ray.rllib.models.tf.fcnet import FullyConnectedNetwork
- from ray.rllib.models.torch.torch_modelv2 import TorchModelV2
- from ray.rllib.models.torch.fcnet import FullyConnectedNetwork as TorchFC
- from ray.rllib.utils.framework import try_import_tf, try_import_torch
- from ray.rllib.utils.test_utils import check_learning_achieved
- from ray.tune.logger import pretty_print
- tf1, tf, tfv = try_import_tf()
- torch, nn = try_import_torch()
- parser = argparse.ArgumentParser()
- parser.add_argument(
- "--run",
- type=str,
- default="PPO",
- help="The RLlib-registered algorithm to use.")
- parser.add_argument(
- "--framework",
- choices=["tf", "tf2", "tfe", "torch"],
- default="tf",
- help="The DL framework specifier.")
- parser.add_argument(
- "--as-test",
- action="store_true",
- help="Whether this script should be run as a test: --stop-reward must "
- "be achieved within --stop-timesteps AND --stop-iters.")
- parser.add_argument(
- "--stop-iters",
- type=int,
- default=50,
- help="Number of iterations to train.")
- parser.add_argument(
- "--stop-timesteps",
- type=int,
- default=100000,
- help="Number of timesteps to train.")
- parser.add_argument(
- "--stop-reward",
- type=float,
- default=0.1,
- help="Reward at which we stop training.")
- parser.add_argument(
- "--no-tune",
- action="store_true",
- help="Run without Tune using a manual train loop instead. In this case,"
- "use PPO without grid search and no TensorBoard.")
- parser.add_argument(
- "--local-mode",
- action="store_true",
- help="Init Ray in local mode for easier debugging.")
- class SimpleCorridor(gym.Env):
- """Example of a custom env in which you have to walk down a corridor.
- You can configure the length of the corridor via the env config."""
- def __init__(self, config: EnvContext):
- self.end_pos = config["corridor_length"]
- self.cur_pos = 0
- self.action_space = Discrete(2)
- self.observation_space = Box(
- 0.0, self.end_pos, shape=(1, ), dtype=np.float32)
- # Set the seed. This is only used for the final (reach goal) reward.
- self.seed(config.worker_index * config.num_workers)
- def reset(self):
- self.cur_pos = 0
- return [self.cur_pos]
- def step(self, action):
- assert action in [0, 1], action
- if action == 0 and self.cur_pos > 0:
- self.cur_pos -= 1
- elif action == 1:
- self.cur_pos += 1
- done = self.cur_pos >= self.end_pos
- # Produce a random reward when we reach the goal.
- return [self.cur_pos], \
- random.random() * 2 if done else -0.1, done, {}
- def seed(self, seed=None):
- random.seed(seed)
- class CustomModel(TFModelV2):
- """Example of a keras custom model that just delegates to an fc-net."""
- def __init__(self, obs_space, action_space, num_outputs, model_config,
- name):
- super(CustomModel, self).__init__(obs_space, action_space, num_outputs,
- model_config, name)
- self.model = FullyConnectedNetwork(obs_space, action_space,
- num_outputs, model_config, name)
- def forward(self, input_dict, state, seq_lens):
- return self.model.forward(input_dict, state, seq_lens)
- def value_function(self):
- return self.model.value_function()
- class TorchCustomModel(TorchModelV2, nn.Module):
- """Example of a PyTorch custom model that just delegates to a fc-net."""
- def __init__(self, obs_space, action_space, num_outputs, model_config,
- name):
- TorchModelV2.__init__(self, obs_space, action_space, num_outputs,
- model_config, name)
- nn.Module.__init__(self)
- self.torch_sub_model = TorchFC(obs_space, action_space, num_outputs,
- model_config, name)
- def forward(self, input_dict, state, seq_lens):
- input_dict["obs"] = input_dict["obs"].float()
- fc_out, _ = self.torch_sub_model(input_dict, state, seq_lens)
- return fc_out, []
- def value_function(self):
- return torch.reshape(self.torch_sub_model.value_function(), [-1])
- if __name__ == "__main__":
- args = parser.parse_args()
- print(f"Running with following CLI options: {args}")
- ray.init(local_mode=args.local_mode)
- # Can also register the env creator function explicitly with:
- # register_env("corridor", lambda config: SimpleCorridor(config))
- ModelCatalog.register_custom_model(
- "my_model", TorchCustomModel
- if args.framework == "torch" else CustomModel)
- config = {
- "env": SimpleCorridor, # or "corridor" if registered above
- "env_config": {
- "corridor_length": 5,
- },
- # Use GPUs iff `RLLIB_NUM_GPUS` env var set to > 0.
- "num_gpus": int(os.environ.get("RLLIB_NUM_GPUS", "0")),
- "model": {
- "custom_model": "my_model",
- "vf_share_layers": True,
- },
- "num_workers": 1, # parallelism
- "framework": args.framework,
- }
- stop = {
- "training_iteration": args.stop_iters,
- "timesteps_total": args.stop_timesteps,
- "episode_reward_mean": args.stop_reward,
- }
- if args.no_tune:
- # manual training with train loop using PPO and fixed learning rate
- if args.run != "PPO":
- raise ValueError("Only support --run PPO with --no-tune.")
- print("Running manual train loop without Ray Tune.")
- ppo_config = ppo.DEFAULT_CONFIG.copy()
- ppo_config.update(config)
- # use fixed learning rate instead of grid search (needs tune)
- ppo_config["lr"] = 1e-3
- trainer = ppo.PPOTrainer(config=ppo_config, env=SimpleCorridor)
- # run manual training loop and print results after each iteration
- for _ in range(args.stop_iters):
- result = trainer.train()
- print(pretty_print(result))
- # stop training of the target train steps or reward are reached
- if result["timesteps_total"] >= args.stop_timesteps or \
- result["episode_reward_mean"] >= args.stop_reward:
- break
- else:
- # automated run with Tune and grid search and TensorBoard
- print("Training automatically with Ray Tune")
- results = tune.run(args.run, config=config, stop=stop)
- if args.as_test:
- print("Checking if learning goals were achieved")
- check_learning_achieved(results, args.stop_reward)
- ray.shutdown()
|