123456789101112131415161718192021222324252627282930313233343536373839404142434445464748495051525354555657585960616263646566676869707172737475767778798081828384858687888990919293949596979899100101102103104105106107108109110111112113114115116117118119120121122123124125126127128129130131132133134135136137138139140141142143144145146147148149150151152153154155156157158159160161162163164165166167168169170171172173174175176177178179180181182183184185186187188189190191192193194195196197198199200201202203204205206207208209210211212213214215216217218219220221222223224225226227228229230231232233234235236237238239240241242243244245246247248249250251252253254255256257258259260261262263264265266267268269270271272273 |
- import numpy as np
- from typing import Optional, Union
- from ray.rllib.models.action_dist import ActionDistribution
- from ray.rllib.utils.annotations import override, PublicAPI
- from ray.rllib.utils.exploration.gaussian_noise import GaussianNoise
- from ray.rllib.utils.framework import (
- try_import_tf,
- try_import_torch,
- get_variable,
- TensorType,
- )
- from ray.rllib.utils.numpy import convert_to_numpy
- from ray.rllib.utils.schedules import Schedule
- from ray.rllib.utils.tf_utils import zero_logps_from_actions
- tf1, tf, tfv = try_import_tf()
- torch, _ = try_import_torch()
- @PublicAPI
- class OrnsteinUhlenbeckNoise(GaussianNoise):
- """An exploration that adds Ornstein-Uhlenbeck noise to continuous actions.
- If explore=True, returns sampled actions plus a noise term X,
- which changes according to this formula:
- Xt+1 = -theta*Xt + sigma*N[0,stddev], where theta, sigma and stddev are
- constants. Also, some completely random period is possible at the
- beginning.
- If explore=False, returns the deterministic action.
- """
- def __init__(
- self,
- action_space,
- *,
- framework: str,
- ou_theta: float = 0.15,
- ou_sigma: float = 0.2,
- ou_base_scale: float = 0.1,
- random_timesteps: int = 1000,
- initial_scale: float = 1.0,
- final_scale: float = 0.02,
- scale_timesteps: int = 10000,
- scale_schedule: Optional[Schedule] = None,
- **kwargs
- ):
- """Initializes an Ornstein-Uhlenbeck Exploration object.
- Args:
- action_space: The gym action space used by the environment.
- ou_theta: The theta parameter of the Ornstein-Uhlenbeck process.
- ou_sigma: The sigma parameter of the Ornstein-Uhlenbeck process.
- ou_base_scale: A fixed scaling factor, by which all OU-
- noise is multiplied. NOTE: This is on top of the parent
- GaussianNoise's scaling.
- random_timesteps: The number of timesteps for which to act
- completely randomly. Only after this number of timesteps, the
- `self.scale` annealing process will start (see below).
- initial_scale: The initial scaling weight to multiply the
- noise with.
- final_scale: The final scaling weight to multiply the noise with.
- scale_timesteps: The timesteps over which to linearly anneal the
- scaling factor (after(!) having used random actions for
- `random_timesteps` steps.
- scale_schedule: An optional Schedule object to use (instead
- of constructing one from the given parameters).
- framework: One of None, "tf", "torch".
- """
- # The current OU-state value (gets updated each time, an eploration
- # action is computed).
- self.ou_state = get_variable(
- np.array(action_space.low.size * [0.0], dtype=np.float32),
- framework=framework,
- tf_name="ou_state",
- torch_tensor=True,
- device=None,
- )
- super().__init__(
- action_space,
- framework=framework,
- random_timesteps=random_timesteps,
- initial_scale=initial_scale,
- final_scale=final_scale,
- scale_timesteps=scale_timesteps,
- scale_schedule=scale_schedule,
- stddev=1.0, # Force `self.stddev` to 1.0.
- **kwargs
- )
- self.ou_theta = ou_theta
- self.ou_sigma = ou_sigma
- self.ou_base_scale = ou_base_scale
- # Now that we know the device, move ou_state there, in case of PyTorch.
- if self.framework == "torch" and self.device is not None:
- self.ou_state = self.ou_state.to(self.device)
- @override(GaussianNoise)
- def _get_tf_exploration_action_op(
- self,
- action_dist: ActionDistribution,
- explore: Union[bool, TensorType],
- timestep: Union[int, TensorType],
- ):
- ts = timestep if timestep is not None else self.last_timestep
- scale = self.scale_schedule(ts)
- # The deterministic actions (if explore=False).
- deterministic_actions = action_dist.deterministic_sample()
- # Apply base-scaled and time-annealed scaled OU-noise to
- # deterministic actions.
- gaussian_sample = tf.random.normal(
- shape=[self.action_space.low.size], stddev=self.stddev
- )
- ou_new = self.ou_theta * -self.ou_state + self.ou_sigma * gaussian_sample
- if self.framework == "tf2":
- self.ou_state.assign_add(ou_new)
- ou_state_new = self.ou_state
- else:
- ou_state_new = tf1.assign_add(self.ou_state, ou_new)
- high_m_low = self.action_space.high - self.action_space.low
- high_m_low = tf.where(
- tf.math.is_inf(high_m_low), tf.ones_like(high_m_low), high_m_low
- )
- noise = scale * self.ou_base_scale * ou_state_new * high_m_low
- stochastic_actions = tf.clip_by_value(
- deterministic_actions + noise,
- self.action_space.low * tf.ones_like(deterministic_actions),
- self.action_space.high * tf.ones_like(deterministic_actions),
- )
- # Stochastic actions could either be: random OR action + noise.
- random_actions, _ = self.random_exploration.get_tf_exploration_action_op(
- action_dist, explore
- )
- exploration_actions = tf.cond(
- pred=tf.convert_to_tensor(ts < self.random_timesteps),
- true_fn=lambda: random_actions,
- false_fn=lambda: stochastic_actions,
- )
- # Chose by `explore` (main exploration switch).
- action = tf.cond(
- pred=tf.constant(explore, dtype=tf.bool)
- if isinstance(explore, bool)
- else explore,
- true_fn=lambda: exploration_actions,
- false_fn=lambda: deterministic_actions,
- )
- # Logp=always zero.
- logp = zero_logps_from_actions(deterministic_actions)
- # Increment `last_timestep` by 1 (or set to `timestep`).
- if self.framework == "tf2":
- if timestep is None:
- self.last_timestep.assign_add(1)
- else:
- self.last_timestep.assign(tf.cast(timestep, tf.int64))
- else:
- assign_op = (
- tf1.assign_add(self.last_timestep, 1)
- if timestep is None
- else tf1.assign(self.last_timestep, timestep)
- )
- with tf1.control_dependencies([assign_op, ou_state_new]):
- action = tf.identity(action)
- logp = tf.identity(logp)
- return action, logp
- @override(GaussianNoise)
- def _get_torch_exploration_action(
- self,
- action_dist: ActionDistribution,
- explore: bool,
- timestep: Union[int, TensorType],
- ):
- # Set last timestep or (if not given) increase by one.
- self.last_timestep = (
- timestep if timestep is not None else self.last_timestep + 1
- )
- # Apply exploration.
- if explore:
- # Random exploration phase.
- if self.last_timestep < self.random_timesteps:
- action, _ = self.random_exploration.get_torch_exploration_action(
- action_dist, explore=True
- )
- # Apply base-scaled and time-annealed scaled OU-noise to
- # deterministic actions.
- else:
- det_actions = action_dist.deterministic_sample()
- scale = self.scale_schedule(self.last_timestep)
- gaussian_sample = scale * torch.normal(
- mean=torch.zeros(self.ou_state.size()), std=1.0
- ).to(self.device)
- ou_new = (
- self.ou_theta * -self.ou_state + self.ou_sigma * gaussian_sample
- )
- self.ou_state += ou_new
- high_m_low = torch.from_numpy(
- self.action_space.high - self.action_space.low
- ).to(self.device)
- high_m_low = torch.where(
- torch.isinf(high_m_low),
- torch.ones_like(high_m_low).to(self.device),
- high_m_low,
- )
- noise = scale * self.ou_base_scale * self.ou_state * high_m_low
- action = torch.min(
- torch.max(
- det_actions + noise,
- torch.tensor(
- self.action_space.low,
- dtype=torch.float32,
- device=self.device,
- ),
- ),
- torch.tensor(
- self.action_space.high, dtype=torch.float32, device=self.device
- ),
- )
- # No exploration -> Return deterministic actions.
- else:
- action = action_dist.deterministic_sample()
- # Logp=always zero.
- logp = torch.zeros((action.size()[0],), dtype=torch.float32, device=self.device)
- return action, logp
- @override(GaussianNoise)
- def get_state(self, sess: Optional["tf.Session"] = None):
- """Returns the current scale value.
- Returns:
- Union[float,tf.Tensor[float]]: The current scale value.
- """
- if sess:
- return sess.run(
- dict(
- self._tf_state_op,
- **{
- "ou_state": self.ou_state,
- }
- )
- )
- state = super().get_state()
- return dict(
- state,
- **{
- "ou_state": convert_to_numpy(self.ou_state)
- if self.framework != "tf"
- else self.ou_state,
- }
- )
- @override(GaussianNoise)
- def set_state(self, state: dict, sess: Optional["tf.Session"] = None) -> None:
- if self.framework == "tf":
- self.ou_state.load(state["ou_state"], session=sess)
- elif isinstance(self.ou_state, np.ndarray):
- self.ou_state = state["ou_state"]
- elif torch and torch.is_tensor(self.ou_state):
- self.ou_state = torch.from_numpy(state["ou_state"])
- else:
- self.ou_state.assign(state["ou_state"])
- super().set_state(state, sess=sess)
|