123456789101112131415161718192021222324252627282930313233343536373839404142434445464748495051525354555657585960616263646566676869707172737475767778798081828384858687888990919293949596979899100101102103104105106107108109110111112113114115116117118119120121122123124125126127128129130131132133134135136137138139140141142143144145146147148149150151152153154155156157158159160161162163164165166167168169170171172173174175176177178179180181182183184185186187188189190191192193194195196197198199200201202203204205206207208209210211212213214215216217218219220221222223224225226227228229230231232233234235236237238239240241242243244245246247248249250251252253254255256257258259260261262263264265266267268269270271272273274275276277278279280281282283284285286287288289290291292293294295296297298299300301302303304305306307308309 |
- from collections import deque
- import gym
- from gym import spaces
- import numpy as np
- from ray.rllib.utils.images import rgb2gray, resize
- def is_atari(env):
- if (hasattr(env.observation_space, "shape")
- and env.observation_space.shape is not None
- and len(env.observation_space.shape) <= 2):
- return False
- return hasattr(env, "unwrapped") and hasattr(env.unwrapped, "ale")
- def get_wrapper_by_cls(env, cls):
- """Returns the gym env wrapper of the given class, or None."""
- currentenv = env
- while True:
- if isinstance(currentenv, cls):
- return currentenv
- elif isinstance(currentenv, gym.Wrapper):
- currentenv = currentenv.env
- else:
- return None
- class MonitorEnv(gym.Wrapper):
- def __init__(self, env=None):
- """Record episodes stats prior to EpisodicLifeEnv, etc."""
- gym.Wrapper.__init__(self, env)
- self._current_reward = None
- self._num_steps = None
- self._total_steps = None
- self._episode_rewards = []
- self._episode_lengths = []
- self._num_episodes = 0
- self._num_returned = 0
- def reset(self, **kwargs):
- obs = self.env.reset(**kwargs)
- if self._total_steps is None:
- self._total_steps = sum(self._episode_lengths)
- if self._current_reward is not None:
- self._episode_rewards.append(self._current_reward)
- self._episode_lengths.append(self._num_steps)
- self._num_episodes += 1
- self._current_reward = 0
- self._num_steps = 0
- return obs
- def step(self, action):
- obs, rew, done, info = self.env.step(action)
- self._current_reward += rew
- self._num_steps += 1
- self._total_steps += 1
- return (obs, rew, done, info)
- def get_episode_rewards(self):
- return self._episode_rewards
- def get_episode_lengths(self):
- return self._episode_lengths
- def get_total_steps(self):
- return self._total_steps
- def next_episode_results(self):
- for i in range(self._num_returned, len(self._episode_rewards)):
- yield (self._episode_rewards[i], self._episode_lengths[i])
- self._num_returned = len(self._episode_rewards)
- class NoopResetEnv(gym.Wrapper):
- def __init__(self, env, noop_max=30):
- """Sample initial states by taking random number of no-ops on reset.
- No-op is assumed to be action 0.
- """
- gym.Wrapper.__init__(self, env)
- self.noop_max = noop_max
- self.override_num_noops = None
- self.noop_action = 0
- assert env.unwrapped.get_action_meanings()[0] == "NOOP"
- def reset(self, **kwargs):
- """ Do no-op action for a number of steps in [1, noop_max]."""
- self.env.reset(**kwargs)
- if self.override_num_noops is not None:
- noops = self.override_num_noops
- else:
- noops = self.unwrapped.np_random.randint(1, self.noop_max + 1)
- assert noops > 0
- obs = None
- for _ in range(noops):
- obs, _, done, _ = self.env.step(self.noop_action)
- if done:
- obs = self.env.reset(**kwargs)
- return obs
- def step(self, ac):
- return self.env.step(ac)
- class ClipRewardEnv(gym.RewardWrapper):
- def __init__(self, env):
- gym.RewardWrapper.__init__(self, env)
- def reward(self, reward):
- """Bin reward to {+1, 0, -1} by its sign."""
- return np.sign(reward)
- class FireResetEnv(gym.Wrapper):
- def __init__(self, env):
- """Take action on reset.
- For environments that are fixed until firing."""
- gym.Wrapper.__init__(self, env)
- assert env.unwrapped.get_action_meanings()[1] == "FIRE"
- assert len(env.unwrapped.get_action_meanings()) >= 3
- def reset(self, **kwargs):
- self.env.reset(**kwargs)
- obs, _, done, _ = self.env.step(1)
- if done:
- self.env.reset(**kwargs)
- obs, _, done, _ = self.env.step(2)
- if done:
- self.env.reset(**kwargs)
- return obs
- def step(self, ac):
- return self.env.step(ac)
- class EpisodicLifeEnv(gym.Wrapper):
- def __init__(self, env):
- """Make end-of-life == end-of-episode, but only reset on true game over.
- Done by DeepMind for the DQN and co. since it helps value estimation.
- """
- gym.Wrapper.__init__(self, env)
- self.lives = 0
- self.was_real_done = True
- def step(self, action):
- obs, reward, done, info = self.env.step(action)
- self.was_real_done = done
- # check current lives, make loss of life terminal,
- # then update lives to handle bonus lives
- lives = self.env.unwrapped.ale.lives()
- if lives < self.lives and lives > 0:
- # for Qbert sometimes we stay in lives == 0 condtion for a few fr
- # so its important to keep lives > 0, so that we only reset once
- # the environment advertises done.
- done = True
- self.lives = lives
- return obs, reward, done, info
- def reset(self, **kwargs):
- """Reset only when lives are exhausted.
- This way all states are still reachable even though lives are episodic,
- and the learner need not know about any of this behind-the-scenes.
- """
- if self.was_real_done:
- obs = self.env.reset(**kwargs)
- else:
- # no-op step to advance from terminal/lost life state
- obs, _, _, _ = self.env.step(0)
- self.lives = self.env.unwrapped.ale.lives()
- return obs
- class MaxAndSkipEnv(gym.Wrapper):
- def __init__(self, env, skip=4):
- """Return only every `skip`-th frame"""
- gym.Wrapper.__init__(self, env)
- # most recent raw observations (for max pooling across time steps)
- self._obs_buffer = np.zeros(
- (2, ) + env.observation_space.shape, dtype=np.uint8)
- self._skip = skip
- def step(self, action):
- """Repeat action, sum reward, and max over last observations."""
- total_reward = 0.0
- done = None
- for i in range(self._skip):
- obs, reward, done, info = self.env.step(action)
- if i == self._skip - 2:
- self._obs_buffer[0] = obs
- if i == self._skip - 1:
- self._obs_buffer[1] = obs
- total_reward += reward
- if done:
- break
- # Note that the observation on the done=True frame
- # doesn't matter
- max_frame = self._obs_buffer.max(axis=0)
- return max_frame, total_reward, done, info
- def reset(self, **kwargs):
- return self.env.reset(**kwargs)
- class WarpFrame(gym.ObservationWrapper):
- def __init__(self, env, dim):
- """Warp frames to the specified size (dim x dim)."""
- gym.ObservationWrapper.__init__(self, env)
- self.width = dim
- self.height = dim
- self.observation_space = spaces.Box(
- low=0,
- high=255,
- shape=(self.height, self.width, 1),
- dtype=np.uint8)
- def observation(self, frame):
- frame = rgb2gray(frame)
- frame = resize(frame, height=self.height, width=self.width)
- return frame[:, :, None]
- # TODO: (sven) Deprecated class. Remove once traj. view is the norm.
- class FrameStack(gym.Wrapper):
- def __init__(self, env, k):
- """Stack k last frames."""
- gym.Wrapper.__init__(self, env)
- self.k = k
- self.frames = deque([], maxlen=k)
- shp = env.observation_space.shape
- self.observation_space = spaces.Box(
- low=0,
- high=255,
- shape=(shp[0], shp[1], shp[2] * k),
- dtype=env.observation_space.dtype)
- def reset(self):
- ob = self.env.reset()
- for _ in range(self.k):
- self.frames.append(ob)
- return self._get_ob()
- def step(self, action):
- ob, reward, done, info = self.env.step(action)
- self.frames.append(ob)
- return self._get_ob(), reward, done, info
- def _get_ob(self):
- assert len(self.frames) == self.k
- return np.concatenate(self.frames, axis=2)
- class FrameStackTrajectoryView(gym.ObservationWrapper):
- def __init__(self, env):
- """No stacking. Trajectory View API takes care of this."""
- gym.Wrapper.__init__(self, env)
- shp = env.observation_space.shape
- assert shp[2] == 1
- self.observation_space = spaces.Box(
- low=0,
- high=255,
- shape=(shp[0], shp[1]),
- dtype=env.observation_space.dtype)
- def observation(self, observation):
- return np.squeeze(observation, axis=-1)
- class ScaledFloatFrame(gym.ObservationWrapper):
- def __init__(self, env):
- gym.ObservationWrapper.__init__(self, env)
- self.observation_space = gym.spaces.Box(
- low=0, high=1, shape=env.observation_space.shape, dtype=np.float32)
- def observation(self, observation):
- # careful! This undoes the memory optimization, use
- # with smaller replay buffers only.
- return np.array(observation).astype(np.float32) / 255.0
- def wrap_deepmind(env, dim=84, framestack=True):
- """Configure environment for DeepMind-style Atari.
- Note that we assume reward clipping is done outside the wrapper.
- Args:
- env (EnvType): The env object to wrap.
- dim (int): Dimension to resize observations to (dim x dim).
- framestack (bool): Whether to framestack observations.
- """
- env = MonitorEnv(env)
- env = NoopResetEnv(env, noop_max=30)
- if env.spec is not None and "NoFrameskip" in env.spec.id:
- env = MaxAndSkipEnv(env, skip=4)
- env = EpisodicLifeEnv(env)
- if "FIRE" in env.unwrapped.get_action_meanings():
- env = FireResetEnv(env)
- env = WarpFrame(env, dim)
- # env = ScaledFloatFrame(env) # TODO: use for dqn?
- # env = ClipRewardEnv(env) # reward clipping is handled by policy eval
- # 4x image framestacking.
- if framestack is True:
- env = FrameStack(env, 4)
- return env
|