123456789101112131415161718192021222324252627282930313233343536373839404142434445464748495051525354555657585960616263646566676869707172737475767778798081828384858687888990919293949596979899100101102103104105106107108109110111112113114115116117118119120121122123124125126127128129130131132133134135136137138139140141142143144145146147148149150151152153154155156157158159160161162163164165166167168169170171172173174175176177178179180181182183184185186187188189190191192193194195196197198199200201202203204205206207208209210211212213214215216217218219220221222223224225226227228229230231232233234235236237238239240241242243244245246247248249250251252253254255256257258259260261262263264265266267268269270271272273274275276277278279280281282283284285286287288289290291292293294295296297298299300301302303304305306307308309310311312313314315316317318319320321322323324325326327328329330331332333334335336337338339340341342343344345346347348349350351352353354355356357358359360361362363364365366367368369370371372373374375376377378379380381382383384385386387388389390391392393394395396397398399400401402403404405406407408409410411412413414415416417418419420421422423424425426 |
- import gym
- from gym.spaces import Discrete, MultiDiscrete
- import numpy as np
- import tree # pip install dm_tree
- from typing import Any, Callable, List, Optional, Type, TYPE_CHECKING, Union
- from ray.rllib.utils.deprecation import Deprecated
- from ray.rllib.utils.framework import try_import_tf
- from ray.rllib.utils.spaces.space_utils import get_base_struct_from_space
- from ray.rllib.utils.typing import LocalOptimizer, ModelGradients, \
- PartialTrainerConfigDict, TensorStructType, TensorType
- if TYPE_CHECKING:
- from ray.rllib.policy.tf_policy import TFPolicy
- tf1, tf, tfv = try_import_tf()
- @Deprecated(new="ray.rllib.utils.numpy.convert_to_numpy()", error=True)
- def convert_to_non_tf_type(x: TensorStructType) -> TensorStructType:
- """Converts values in `stats` to non-Tensor numpy or python types.
- Args:
- x: Any (possibly nested) struct, the values in which will be
- converted and returned as a new struct with all tf (eager) tensors
- being converted to numpy types.
- Returns:
- A new struct with the same structure as `x`, but with all
- values converted to non-tf Tensor types.
- """
- # The mapping function used to numpyize torch Tensors.
- def mapping(item):
- if isinstance(item, (tf.Tensor, tf.Variable)):
- return item.numpy()
- else:
- return item
- return tree.map_structure(mapping, x)
- def explained_variance(y: TensorType, pred: TensorType) -> TensorType:
- """Computes the explained variance for a pair of labels and predictions.
- The formula used is:
- max(-1.0, 1.0 - (std(y - pred)^2 / std(y)^2))
- Args:
- y: The labels.
- pred: The predictions.
- Returns:
- The explained variance given a pair of labels and predictions.
- """
- _, y_var = tf.nn.moments(y, axes=[0])
- _, diff_var = tf.nn.moments(y - pred, axes=[0])
- return tf.maximum(-1.0, 1 - (diff_var / y_var))
- def get_gpu_devices() -> List[str]:
- """Returns a list of GPU device names, e.g. ["/gpu:0", "/gpu:1"].
- Supports both tf1.x and tf2.x.
- Returns:
- List of GPU device names (str).
- """
- if tfv == 1:
- from tensorflow.python.client import device_lib
- devices = device_lib.list_local_devices()
- else:
- try:
- devices = tf.config.list_physical_devices()
- except Exception:
- devices = tf.config.experimental.list_physical_devices()
- # Expect "GPU", but also stuff like: "XLA_GPU".
- return [d.name for d in devices if "GPU" in d.device_type]
- def get_placeholder(*,
- space: Optional[gym.Space] = None,
- value: Optional[Any] = None,
- name: Optional[str] = None,
- time_axis: bool = False,
- flatten: bool = True) -> "tf1.placeholder":
- """Returns a tf1.placeholder object given optional hints, such as a space.
- Note that the returned placeholder will always have a leading batch
- dimension (None).
- Args:
- space: An optional gym.Space to hint the shape and dtype of the
- placeholder.
- value: An optional value to hint the shape and dtype of the
- placeholder.
- name: An optional name for the placeholder.
- time_axis: Whether the placeholder should also receive a time
- dimension (None).
- flatten: Whether to flatten the given space into a plain Box space
- and then create the placeholder from the resulting space.
- Returns:
- The tf1 placeholder.
- """
- from ray.rllib.models.catalog import ModelCatalog
- if space is not None:
- if isinstance(space, (gym.spaces.Dict, gym.spaces.Tuple)):
- if flatten:
- return ModelCatalog.get_action_placeholder(space, None)
- else:
- return tree.map_structure_with_path(
- lambda path, component: get_placeholder(
- space=component,
- name=name + "." + ".".join([str(p) for p in path]),
- ),
- get_base_struct_from_space(space),
- )
- return tf1.placeholder(
- shape=(None, ) + ((None, ) if time_axis else ()) + space.shape,
- dtype=tf.float32 if space.dtype == np.float64 else space.dtype,
- name=name,
- )
- else:
- assert value is not None
- shape = value.shape[1:]
- return tf1.placeholder(
- shape=(None, ) + ((None, )
- if time_axis else ()) + (shape if isinstance(
- shape, tuple) else tuple(shape.as_list())),
- dtype=tf.float32 if value.dtype == np.float64 else value.dtype,
- name=name,
- )
- def get_tf_eager_cls_if_necessary(
- orig_cls: Type["TFPolicy"],
- config: PartialTrainerConfigDict) -> Type["TFPolicy"]:
- """Returns the corresponding tf-eager class for a given TFPolicy class.
- Args:
- orig_cls: The original TFPolicy class to get the corresponding tf-eager
- class for.
- config: The Trainer config dict.
- Returns:
- The tf eager policy class corresponding to the given TFPolicy class.
- """
- cls = orig_cls
- framework = config.get("framework", "tf")
- if framework in ["tf2", "tf", "tfe"]:
- if not tf1:
- raise ImportError("Could not import tensorflow!")
- if framework in ["tf2", "tfe"]:
- assert tf1.executing_eagerly()
- from ray.rllib.policy.tf_policy import TFPolicy
- # Create eager-class.
- if hasattr(orig_cls, "as_eager"):
- cls = orig_cls.as_eager()
- if config.get("eager_tracing"):
- cls = cls.with_tracing()
- # Could be some other type of policy.
- elif not issubclass(orig_cls, TFPolicy):
- pass
- else:
- raise ValueError("This policy does not support eager "
- "execution: {}".format(orig_cls))
- return cls
- def huber_loss(x: TensorType, delta: float = 1.0) -> TensorType:
- """Computes the huber loss for a given term and delta parameter.
- Reference: https://en.wikipedia.org/wiki/Huber_loss
- Note that the factor of 0.5 is implicitly included in the calculation.
- Formula:
- L = 0.5 * x^2 for small abs x (delta threshold)
- L = delta * (abs(x) - 0.5*delta) for larger abs x (delta threshold)
- Args:
- x: The input term, e.g. a TD error.
- delta: The delta parmameter in the above formula.
- Returns:
- The Huber loss resulting from `x` and `delta`.
- """
- return tf.where(
- tf.abs(x) < delta, # for small x -> apply the Huber correction
- tf.math.square(x) * 0.5,
- delta * (tf.abs(x) - 0.5 * delta),
- )
- def make_tf_callable(session_or_none: Optional["tf1.Session"],
- dynamic_shape: bool = False) -> Callable:
- """Returns a function that can be executed in either graph or eager mode.
- The function must take only positional args.
- If eager is enabled, this will act as just a function. Otherwise, it
- will build a function that executes a session run with placeholders
- internally.
- Args:
- session_or_none: tf.Session if in graph mode, else None.
- dynamic_shape: True if the placeholders should have a dynamic
- batch dimension. Otherwise they will be fixed shape.
- Returns:
- A function that can be called in either eager or static-graph mode.
- """
- if tf.executing_eagerly():
- assert session_or_none is None
- else:
- assert session_or_none is not None
- def make_wrapper(fn):
- # Static-graph mode: Create placeholders and make a session call each
- # time the wrapped function is called. Returns the output of this
- # session call.
- if session_or_none is not None:
- args_placeholders = []
- kwargs_placeholders = {}
- symbolic_out = [None]
- def call(*args, **kwargs):
- args_flat = []
- for a in args:
- if type(a) is list:
- args_flat.extend(a)
- else:
- args_flat.append(a)
- args = args_flat
- # We have not built any placeholders yet: Do this once here,
- # then reuse the same placeholders each time we call this
- # function again.
- if symbolic_out[0] is None:
- with session_or_none.graph.as_default():
- def _create_placeholders(path, value):
- if dynamic_shape:
- if len(value.shape) > 0:
- shape = (None, ) + value.shape[1:]
- else:
- shape = ()
- else:
- shape = value.shape
- return tf1.placeholder(
- dtype=value.dtype,
- shape=shape,
- name=".".join([str(p) for p in path]),
- )
- placeholders = tree.map_structure_with_path(
- _create_placeholders, args)
- for ph in tree.flatten(placeholders):
- args_placeholders.append(ph)
- placeholders = tree.map_structure_with_path(
- _create_placeholders, kwargs)
- for k, ph in placeholders.items():
- kwargs_placeholders[k] = ph
- symbolic_out[0] = fn(*args_placeholders,
- **kwargs_placeholders)
- feed_dict = dict(zip(args_placeholders, tree.flatten(args)))
- tree.map_structure(lambda ph, v: feed_dict.__setitem__(ph, v),
- kwargs_placeholders, kwargs)
- ret = session_or_none.run(symbolic_out[0], feed_dict)
- return ret
- return call
- # Eager mode (call function as is).
- else:
- return fn
- return make_wrapper
- def minimize_and_clip(
- optimizer: LocalOptimizer,
- objective: TensorType,
- var_list: List["tf.Variable"],
- clip_val: float = 10.0,
- ) -> ModelGradients:
- """Computes, then clips gradients using objective, optimizer and var list.
- Ensures the norm of the gradients for each variable is clipped to
- `clip_val`.
- Args:
- optimizer: Either a shim optimizer (tf eager) containing a
- tf.GradientTape under `self.tape` or a tf1 local optimizer
- object.
- objective: The loss tensor to calculate gradients on.
- var_list: The list of tf.Variables to compute gradients over.
- clip_val: The global norm clip value. Will clip around -clip_val and
- +clip_val.
- Returns:
- The resulting model gradients (list or tuples of grads + vars)
- corresponding to the input `var_list`.
- """
- # Accidentally passing values < 0.0 will break all gradients.
- assert clip_val is None or clip_val > 0.0, clip_val
- if tf.executing_eagerly():
- tape = optimizer.tape
- grads_and_vars = list(
- zip(list(tape.gradient(objective, var_list)), var_list))
- else:
- grads_and_vars = optimizer.compute_gradients(
- objective, var_list=var_list)
- return [(tf.clip_by_norm(g, clip_val) if clip_val is not None else g, v)
- for (g, v) in grads_and_vars if g is not None]
- def one_hot(x: TensorType, space: gym.Space) -> TensorType:
- """Returns a one-hot tensor, given and int tensor and a space.
- Handles the MultiDiscrete case as well.
- Args:
- x: The input tensor.
- space: The space to use for generating the one-hot tensor.
- Returns:
- The resulting one-hot tensor.
- Raises:
- ValueError: If the given space is not a discrete one.
- Examples:
- >>> x = tf.Variable([0, 3], dtype=tf.int32) # batch-dim=2
- >>> # Discrete space with 4 (one-hot) slots per batch item.
- >>> s = gym.spaces.Discrete(4)
- >>> one_hot(x, s)
- <tf.Tensor 'one_hot:0' shape=(2, 4) dtype=float32>
- >>> x = tf.Variable([[0, 1, 2, 3]], dtype=tf.int32) # batch-dim=1
- >>> # MultiDiscrete space with 5 + 4 + 4 + 7 = 20 (one-hot) slots
- >>> # per batch item.
- >>> s = gym.spaces.MultiDiscrete([5, 4, 4, 7])
- >>> one_hot(x, s)
- <tf.Tensor 'concat:0' shape=(1, 20) dtype=float32>
- """
- if isinstance(space, Discrete):
- return tf.one_hot(x, space.n, dtype=tf.float32)
- elif isinstance(space, MultiDiscrete):
- return tf.concat(
- [
- tf.one_hot(x[:, i], n, dtype=tf.float32)
- for i, n in enumerate(space.nvec)
- ],
- axis=-1)
- else:
- raise ValueError("Unsupported space for `one_hot`: {}".format(space))
- def reduce_mean_ignore_inf(x: TensorType,
- axis: Optional[int] = None) -> TensorType:
- """Same as tf.reduce_mean() but ignores -inf values.
- Args:
- x: The input tensor to reduce mean over.
- axis: The axis over which to reduce. None for all axes.
- Returns:
- The mean reduced inputs, ignoring inf values.
- """
- mask = tf.not_equal(x, tf.float32.min)
- x_zeroed = tf.where(mask, x, tf.zeros_like(x))
- return (tf.math.reduce_sum(x_zeroed, axis) / tf.math.reduce_sum(
- tf.cast(mask, tf.float32), axis))
- def scope_vars(scope: Union[str, "tf1.VariableScope"],
- trainable_only: bool = False) -> List["tf.Variable"]:
- """Get variables inside a given scope.
- Args:
- scope: Scope in which the variables reside.
- trainable_only: Whether or not to return only the variables that were
- marked as trainable.
- Returns:
- The list of variables in the given `scope`.
- """
- return tf1.get_collection(
- tf1.GraphKeys.TRAINABLE_VARIABLES
- if trainable_only else tf1.GraphKeys.VARIABLES,
- scope=scope if isinstance(scope, str) else scope.name)
- def zero_logps_from_actions(actions: TensorStructType) -> TensorType:
- """Helper function useful for returning dummy logp's (0) for some actions.
- Args:
- actions: The input actions. This can be any struct
- of complex action components or a simple tensor of different
- dimensions, e.g. [B], [B, 2], or {"a": [B, 4, 5], "b": [B]}.
- Returns:
- A 1D tensor of 0.0 (dummy logp's) matching the batch
- dim of `actions` (shape=[B]).
- """
- # Need to flatten `actions` in case we have a complex action space.
- # Take the 0th component to extract the batch dim.
- action_component = tree.flatten(actions)[0]
- logp_ = tf.zeros_like(action_component, dtype=tf.float32)
- # Logp's should be single values (but with the same batch dim as
- # `deterministic_actions` or `stochastic_actions`). In case
- # actions are just [B], zeros_like works just fine here, but if
- # actions are [B, ...], we have to reduce logp back to just [B].
- while len(logp_.shape) > 1:
- logp_ = logp_[:, 0]
- return logp_
|