123456789101112131415161718192021222324252627282930313233343536373839404142434445464748495051525354555657585960616263646566676869707172737475767778798081828384858687888990919293949596979899100101102103104105106107108109110111112113114115116117118119120121122123124125126127128129130131132133134135136137138139140141142143144145146147148149150151152153154155156157158159160161162163164165166167168169170171172173174175176177178179180181182183184185186187188189190191192193194195196197198199200201202203204205206207208209210211212213214215216217218219220221222223224225226227228229230231232233234235236237238239240241242243244245246247248249250251252253254255256257258259260261262263264265266267268269270271272273274275276277278279280281282283284285286287288289290291292293294295296297298299300301302303304305306307308309310311312313314315316317318319320321322323324325326327328329330331332333334335336337338339340341 |
- import logging
- import gym
- import numpy as np
- from typing import Callable, List, Optional, Tuple, Union
- from ray.rllib.env.base_env import BaseEnv
- from ray.rllib.utils.annotations import Deprecated, override, PublicAPI
- from ray.rllib.utils.typing import EnvActionType, EnvID, EnvInfoDict, \
- EnvObsType, EnvType, MultiEnvDict
- logger = logging.getLogger(__name__)
- @PublicAPI
- class VectorEnv:
- """An environment that supports batch evaluation using clones of sub-envs.
- """
- def __init__(self, observation_space: gym.Space, action_space: gym.Space,
- num_envs: int):
- """Initializes a VectorEnv instance.
- Args:
- observation_space: The observation Space of a single
- sub-env.
- action_space: The action Space of a single sub-env.
- num_envs: The number of clones to make of the given sub-env.
- """
- self.observation_space = observation_space
- self.action_space = action_space
- self.num_envs = num_envs
- @staticmethod
- def vectorize_gym_envs(
- make_env: Optional[Callable[[int], EnvType]] = None,
- existing_envs: Optional[List[gym.Env]] = None,
- num_envs: int = 1,
- action_space: Optional[gym.Space] = None,
- observation_space: Optional[gym.Space] = None,
- # Deprecated. These seem to have never been used.
- env_config=None,
- policy_config=None) -> "_VectorizedGymEnv":
- """Translates any given gym.Env(s) into a VectorizedEnv object.
- Args:
- make_env: Factory that produces a new gym.Env taking the sub-env's
- vector index as only arg. Must be defined if the
- number of `existing_envs` is less than `num_envs`.
- existing_envs: Optional list of already instantiated sub
- environments.
- num_envs: Total number of sub environments in this VectorEnv.
- action_space: The action space. If None, use existing_envs[0]'s
- action space.
- observation_space: The observation space. If None, use
- existing_envs[0]'s action space.
- Returns:
- The resulting _VectorizedGymEnv object (subclass of VectorEnv).
- """
- return _VectorizedGymEnv(
- make_env=make_env,
- existing_envs=existing_envs or [],
- num_envs=num_envs,
- observation_space=observation_space,
- action_space=action_space,
- )
- @PublicAPI
- def vector_reset(self) -> List[EnvObsType]:
- """Resets all sub-environments.
- Returns:
- List of observations from each environment.
- """
- raise NotImplementedError
- @PublicAPI
- def reset_at(self, index: Optional[int] = None) -> EnvObsType:
- """Resets a single environment.
- Args:
- index: An optional sub-env index to reset.
- Returns:
- Observations from the reset sub environment.
- """
- raise NotImplementedError
- @PublicAPI
- def vector_step(
- self, actions: List[EnvActionType]
- ) -> Tuple[List[EnvObsType], List[float], List[bool], List[EnvInfoDict]]:
- """Performs a vectorized step on all sub environments using `actions`.
- Args:
- actions: List of actions (one for each sub-env).
- Returns:
- A tuple consisting of
- 1) New observations for each sub-env.
- 2) Reward values for each sub-env.
- 3) Done values for each sub-env.
- 4) Info values for each sub-env.
- """
- raise NotImplementedError
- @PublicAPI
- def get_sub_environments(self) -> List[EnvType]:
- """Returns the underlying sub environments.
- Returns:
- List of all underlying sub environments.
- """
- return []
- # TODO: (sven) Experimental method. Make @PublicAPI at some point.
- def try_render_at(self, index: Optional[int] = None) -> \
- Optional[np.ndarray]:
- """Renders a single environment.
- Args:
- index: An optional sub-env index to render.
- Returns:
- Either a numpy RGB image (shape=(w x h x 3) dtype=uint8) or
- None in case rendering is handled directly by this method.
- """
- pass
- @Deprecated(new="vectorize_gym_envs", error=False)
- def wrap(self, *args, **kwargs) -> "_VectorizedGymEnv":
- return self.vectorize_gym_envs(*args, **kwargs)
- @Deprecated(new="get_sub_environments", error=False)
- def get_unwrapped(self) -> List[EnvType]:
- return self.get_sub_environments()
- @PublicAPI
- def to_base_env(
- self,
- make_env: Optional[Callable[[int], EnvType]] = None,
- num_envs: int = 1,
- remote_envs: bool = False,
- remote_env_batch_wait_ms: int = 0,
- ) -> "BaseEnv":
- """Converts an RLlib MultiAgentEnv into a BaseEnv object.
- The resulting BaseEnv is always vectorized (contains n
- sub-environments) to support batched forward passes, where n may
- also be 1. BaseEnv also supports async execution via the `poll` and
- `send_actions` methods and thus supports external simulators.
- Args:
- make_env: A callable taking an int as input (which indicates
- the number of individual sub-environments within the final
- vectorized BaseEnv) and returning one individual
- sub-environment.
- num_envs: The number of sub-environments to create in the
- resulting (vectorized) BaseEnv. The already existing `env`
- will be one of the `num_envs`.
- remote_envs: Whether each sub-env should be a @ray.remote
- actor. You can set this behavior in your config via the
- `remote_worker_envs=True` option.
- remote_env_batch_wait_ms: The wait time (in ms) to poll remote
- sub-environments for, if applicable. Only used if
- `remote_envs` is True.
- Returns:
- The resulting BaseEnv object.
- """
- del make_env, num_envs, remote_envs, remote_env_batch_wait_ms
- env = VectorEnvWrapper(self)
- return env
- class _VectorizedGymEnv(VectorEnv):
- """Internal wrapper to translate any gym.Envs into a VectorEnv object.
- """
- def __init__(
- self,
- make_env: Optional[Callable[[int], EnvType]] = None,
- existing_envs: Optional[List[gym.Env]] = None,
- num_envs: int = 1,
- *,
- observation_space: Optional[gym.Space] = None,
- action_space: Optional[gym.Space] = None,
- # Deprecated. These seem to have never been used.
- env_config=None,
- policy_config=None,
- ):
- """Initializes a _VectorizedGymEnv object.
- Args:
- make_env: Factory that produces a new gym.Env taking the sub-env's
- vector index as only arg. Must be defined if the
- number of `existing_envs` is less than `num_envs`.
- existing_envs: Optional list of already instantiated sub
- environments.
- num_envs: Total number of sub environments in this VectorEnv.
- action_space: The action space. If None, use existing_envs[0]'s
- action space.
- observation_space: The observation space. If None, use
- existing_envs[0]'s action space.
- """
- self.envs = existing_envs
- # Fill up missing envs (so we have exactly num_envs sub-envs in this
- # VectorEnv.
- while len(self.envs) < num_envs:
- self.envs.append(make_env(len(self.envs)))
- super().__init__(
- observation_space=observation_space
- or self.envs[0].observation_space,
- action_space=action_space or self.envs[0].action_space,
- num_envs=num_envs)
- @override(VectorEnv)
- def vector_reset(self):
- return [e.reset() for e in self.envs]
- @override(VectorEnv)
- def reset_at(self, index: Optional[int] = None) -> EnvObsType:
- if index is None:
- index = 0
- return self.envs[index].reset()
- @override(VectorEnv)
- def vector_step(self, actions):
- obs_batch, rew_batch, done_batch, info_batch = [], [], [], []
- for i in range(self.num_envs):
- obs, r, done, info = self.envs[i].step(actions[i])
- if not np.isscalar(r) or not np.isreal(r) or not np.isfinite(r):
- raise ValueError(
- "Reward should be finite scalar, got {} ({}). "
- "Actions={}.".format(r, type(r), actions[i]))
- if not isinstance(info, dict):
- raise ValueError("Info should be a dict, got {} ({})".format(
- info, type(info)))
- obs_batch.append(obs)
- rew_batch.append(r)
- done_batch.append(done)
- info_batch.append(info)
- return obs_batch, rew_batch, done_batch, info_batch
- @override(VectorEnv)
- def get_sub_environments(self):
- return self.envs
- @override(VectorEnv)
- def try_render_at(self, index: Optional[int] = None):
- if index is None:
- index = 0
- return self.envs[index].render()
- class VectorEnvWrapper(BaseEnv):
- """Internal adapter of VectorEnv to BaseEnv.
- We assume the caller will always send the full vector of actions in each
- call to send_actions(), and that they call reset_at() on all completed
- environments before calling send_actions().
- """
- def __init__(self, vector_env: VectorEnv):
- self.vector_env = vector_env
- self.num_envs = vector_env.num_envs
- self.new_obs = None # lazily initialized
- self.cur_rewards = [None for _ in range(self.num_envs)]
- self.cur_dones = [False for _ in range(self.num_envs)]
- self.cur_infos = [None for _ in range(self.num_envs)]
- self._observation_space = vector_env.observation_space
- self._action_space = vector_env.action_space
- @override(BaseEnv)
- def poll(self) -> Tuple[MultiEnvDict, MultiEnvDict, MultiEnvDict,
- MultiEnvDict, MultiEnvDict]:
- from ray.rllib.env.base_env import with_dummy_agent_id
- if self.new_obs is None:
- self.new_obs = self.vector_env.vector_reset()
- new_obs = dict(enumerate(self.new_obs))
- rewards = dict(enumerate(self.cur_rewards))
- dones = dict(enumerate(self.cur_dones))
- infos = dict(enumerate(self.cur_infos))
- self.new_obs = []
- self.cur_rewards = []
- self.cur_dones = []
- self.cur_infos = []
- return with_dummy_agent_id(new_obs), \
- with_dummy_agent_id(rewards), \
- with_dummy_agent_id(dones, "__all__"), \
- with_dummy_agent_id(infos), {}
- @override(BaseEnv)
- def send_actions(self, action_dict: MultiEnvDict) -> None:
- from ray.rllib.env.base_env import _DUMMY_AGENT_ID
- action_vector = [None] * self.num_envs
- for i in range(self.num_envs):
- action_vector[i] = action_dict[i][_DUMMY_AGENT_ID]
- self.new_obs, self.cur_rewards, self.cur_dones, self.cur_infos = \
- self.vector_env.vector_step(action_vector)
- @override(BaseEnv)
- def try_reset(self, env_id: Optional[EnvID] = None) -> MultiEnvDict:
- from ray.rllib.env.base_env import _DUMMY_AGENT_ID
- assert env_id is None or isinstance(env_id, int)
- return {
- env_id if env_id is not None else 0: {
- _DUMMY_AGENT_ID: self.vector_env.reset_at(env_id)
- }
- }
- @override(BaseEnv)
- def get_sub_environments(
- self, as_dict: bool = False) -> Union[List[EnvType], dict]:
- if not as_dict:
- return self.vector_env.get_sub_environments()
- else:
- return {
- _id: env
- for _id, env in enumerate(
- self.vector_env.get_sub_environments())
- }
- @override(BaseEnv)
- def try_render(self, env_id: Optional[EnvID] = None) -> None:
- assert env_id is None or isinstance(env_id, int)
- return self.vector_env.try_render_at(env_id)
- @property
- @override(BaseEnv)
- @PublicAPI
- def observation_space(self) -> gym.spaces.Dict:
- return self._observation_space
- @property
- @override(BaseEnv)
- @PublicAPI
- def action_space(self) -> gym.Space:
- return self._action_space
|