123456789101112131415161718192021222324252627282930313233343536373839404142434445464748495051525354555657585960616263646566676869707172737475767778798081828384858687888990919293949596979899100101102103104105106107108109110111112113114115116117118119120121122123124125126127128129130131132133134135136137138139140141142143144145146147148149150151152153154155156157158159160161162163164165166167168169170171172173174175176177178179180181182183184185186187188189190191192193194195196197198199200201202203204205206 |
- """
- DeepMind Control Suite Wrapper directly sourced from:
- https://github.com/denisyarats/dmc2gym
- MIT License
- Copyright (c) 2020 Denis Yarats
- Permission is hereby granted, free of charge, to any person obtaining a copy
- of this software and associated documentation files (the "Software"), to deal
- in the Software without restriction, including without limitation the rights
- to use, copy, modify, merge, publish, distribute, sublicense, and/or sell
- copies of the Software, and to permit persons to whom the Software is
- furnished to do so, subject to the following conditions:
- The above copyright notice and this permission notice shall be included in all
- copies or substantial portions of the Software.
- THE SOFTWARE IS PROVIDED "AS IS", WITHOUT WARRANTY OF ANY KIND, EXPRESS OR
- IMPLIED, INCLUDING BUT NOT LIMITED TO THE WARRANTIES OF MERCHANTABILITY,
- FITNESS FOR A PARTICULAR PURPOSE AND NONINFRINGEMENT. IN NO EVENT SHALL THE
- AUTHORS OR COPYRIGHT HOLDERS BE LIABLE FOR ANY CLAIM, DAMAGES OR OTHER
- LIABILITY, WHETHER IN AN ACTION OF CONTRACT, TORT OR OTHERWISE, ARISING FROM,
- OUT OF OR IN CONNECTION WITH THE SOFTWARE OR THE USE OR OTHER DEALINGS IN THE
- SOFTWARE.
- """
- from gym import core, spaces
- try:
- from dm_env import specs
- except ImportError:
- specs = None
- try:
- # Suppress MuJoCo warning (dm_control uses absl logging).
- import absl.logging
- absl.logging.set_verbosity("error")
- from dm_control import suite
- except (ImportError, OSError):
- suite = None
- import numpy as np
- def _spec_to_box(spec):
- def extract_min_max(s):
- assert s.dtype == np.float64 or s.dtype == np.float32
- dim = np.int(np.prod(s.shape))
- if type(s) == specs.Array:
- bound = np.inf * np.ones(dim, dtype=np.float32)
- return -bound, bound
- elif type(s) == specs.BoundedArray:
- zeros = np.zeros(dim, dtype=np.float32)
- return s.minimum + zeros, s.maximum + zeros
- mins, maxs = [], []
- for s in spec:
- mn, mx = extract_min_max(s)
- mins.append(mn)
- maxs.append(mx)
- low = np.concatenate(mins, axis=0)
- high = np.concatenate(maxs, axis=0)
- assert low.shape == high.shape
- return spaces.Box(low, high, dtype=np.float32)
- def _flatten_obs(obs):
- obs_pieces = []
- for v in obs.values():
- flat = np.array([v]) if np.isscalar(v) else v.ravel()
- obs_pieces.append(flat)
- return np.concatenate(obs_pieces, axis=0)
- class DMCEnv(core.Env):
- def __init__(self,
- domain_name,
- task_name,
- task_kwargs=None,
- visualize_reward=False,
- from_pixels=False,
- height=64,
- width=64,
- camera_id=0,
- frame_skip=2,
- environment_kwargs=None,
- channels_first=True,
- preprocess=True):
- self._from_pixels = from_pixels
- self._height = height
- self._width = width
- self._camera_id = camera_id
- self._frame_skip = frame_skip
- self._channels_first = channels_first
- self.preprocess = preprocess
- if specs is None:
- raise RuntimeError((
- "The `specs` module from `dm_env` was not imported. Make sure "
- "`dm_env` is installed and visible in the current python "
- "environment."))
- if suite is None:
- raise RuntimeError(
- ("The `suite` module from `dm_control` was not imported. Make "
- "sure `dm_control` is installed and visible in the current "
- "python enviornment."))
- # create task
- self._env = suite.load(
- domain_name=domain_name,
- task_name=task_name,
- task_kwargs=task_kwargs,
- visualize_reward=visualize_reward,
- environment_kwargs=environment_kwargs)
- # true and normalized action spaces
- self._true_action_space = _spec_to_box([self._env.action_spec()])
- self._norm_action_space = spaces.Box(
- low=-1.0,
- high=1.0,
- shape=self._true_action_space.shape,
- dtype=np.float32)
- # create observation space
- if from_pixels:
- shape = [3, height,
- width] if channels_first else [height, width, 3]
- self._observation_space = spaces.Box(
- low=0, high=255, shape=shape, dtype=np.uint8)
- if preprocess:
- self._observation_space = spaces.Box(
- low=-0.5, high=0.5, shape=shape, dtype=np.float32)
- else:
- self._observation_space = _spec_to_box(
- self._env.observation_spec().values())
- self._state_space = _spec_to_box(self._env.observation_spec().values())
- self.current_state = None
- def __getattr__(self, name):
- return getattr(self._env, name)
- def _get_obs(self, time_step):
- if self._from_pixels:
- obs = self.render(
- height=self._height,
- width=self._width,
- camera_id=self._camera_id)
- if self._channels_first:
- obs = obs.transpose(2, 0, 1).copy()
- if self.preprocess:
- obs = obs / 255.0 - 0.5
- else:
- obs = _flatten_obs(time_step.observation)
- return obs
- def _convert_action(self, action):
- action = action.astype(np.float64)
- true_delta = self._true_action_space.high - self._true_action_space.low
- norm_delta = self._norm_action_space.high - self._norm_action_space.low
- action = (action - self._norm_action_space.low) / norm_delta
- action = action * true_delta + self._true_action_space.low
- action = action.astype(np.float32)
- return action
- @property
- def observation_space(self):
- return self._observation_space
- @property
- def state_space(self):
- return self._state_space
- @property
- def action_space(self):
- return self._norm_action_space
- def step(self, action):
- assert self._norm_action_space.contains(action)
- action = self._convert_action(action)
- assert self._true_action_space.contains(action)
- reward = 0
- extra = {"internal_state": self._env.physics.get_state().copy()}
- for _ in range(self._frame_skip):
- time_step = self._env.step(action)
- reward += time_step.reward or 0
- done = time_step.last()
- if done:
- break
- obs = self._get_obs(time_step)
- self.current_state = _flatten_obs(time_step.observation)
- extra["discount"] = time_step.discount
- return obs, reward, done, extra
- def reset(self):
- time_step = self._env.reset()
- self.current_state = _flatten_obs(time_step.observation)
- obs = self._get_obs(time_step)
- return obs
- def render(self, mode="rgb_array", height=None, width=None, camera_id=0):
- assert mode == "rgb_array", "only support for rgb_array mode"
- height = height or self._height
- width = width or self._width
- camera_id = camera_id or self._camera_id
- return self._env.physics.render(
- height=height, width=width, camera_id=camera_id)
|