123456789101112131415161718192021222324252627282930313233343536373839404142434445464748495051525354555657585960616263646566676869707172737475767778798081828384858687888990919293949596979899100101102103104105106107108109110111112113114115116117118119120121122123124125126127128129130131132133134135136137138139140141142143144145146147148149150151152153154155156157158159160161162163164165166167168169170171172173174175176177178179180181182183184185186187188189190191192193194195196197198199200201202203204205206207208209210211212213214215216217218219220221222223224225226227228229230231232233234235236237238239240241242243244245246247248249250251252253 |
- import numpy as np
- import gym
- from typing import Dict, Optional, Sequence
- from ray.rllib.models.tf.misc import normc_initializer
- from ray.rllib.models.tf.tf_modelv2 import TFModelV2
- from ray.rllib.models.utils import get_activation_fn
- from ray.rllib.policy.sample_batch import SampleBatch
- from ray.rllib.utils.framework import try_import_tf
- from ray.rllib.utils.typing import TensorType, List, ModelConfigDict
- tf1, tf, tfv = try_import_tf()
- # TODO: (sven) obsolete this class once we only support native keras models.
- class FullyConnectedNetwork(TFModelV2):
- """Generic fully connected network implemented in ModelV2 API."""
- def __init__(self, obs_space: gym.spaces.Space,
- action_space: gym.spaces.Space, num_outputs: int,
- model_config: ModelConfigDict, name: str):
- super(FullyConnectedNetwork, self).__init__(
- obs_space, action_space, num_outputs, model_config, name)
- hiddens = list(model_config.get("fcnet_hiddens", [])) + \
- list(model_config.get("post_fcnet_hiddens", []))
- activation = model_config.get("fcnet_activation")
- if not model_config.get("fcnet_hiddens", []):
- activation = model_config.get("post_fcnet_activation")
- activation = get_activation_fn(activation)
- no_final_linear = model_config.get("no_final_linear")
- vf_share_layers = model_config.get("vf_share_layers")
- free_log_std = model_config.get("free_log_std")
- # Generate free-floating bias variables for the second half of
- # the outputs.
- if free_log_std:
- assert num_outputs % 2 == 0, (
- "num_outputs must be divisible by two", num_outputs)
- num_outputs = num_outputs // 2
- self.log_std_var = tf.Variable(
- [0.0] * num_outputs, dtype=tf.float32, name="log_std")
- # We are using obs_flat, so take the flattened shape as input.
- inputs = tf.keras.layers.Input(
- shape=(int(np.product(obs_space.shape)), ), name="observations")
- # Last hidden layer output (before logits outputs).
- last_layer = inputs
- # The action distribution outputs.
- logits_out = None
- i = 1
- # Create layers 0 to second-last.
- for size in hiddens[:-1]:
- last_layer = tf.keras.layers.Dense(
- size,
- name="fc_{}".format(i),
- activation=activation,
- kernel_initializer=normc_initializer(1.0))(last_layer)
- i += 1
- # The last layer is adjusted to be of size num_outputs, but it's a
- # layer with activation.
- if no_final_linear and num_outputs:
- logits_out = tf.keras.layers.Dense(
- num_outputs,
- name="fc_out",
- activation=activation,
- kernel_initializer=normc_initializer(1.0))(last_layer)
- # Finish the layers with the provided sizes (`hiddens`), plus -
- # iff num_outputs > 0 - a last linear layer of size num_outputs.
- else:
- if len(hiddens) > 0:
- last_layer = tf.keras.layers.Dense(
- hiddens[-1],
- name="fc_{}".format(i),
- activation=activation,
- kernel_initializer=normc_initializer(1.0))(last_layer)
- if num_outputs:
- logits_out = tf.keras.layers.Dense(
- num_outputs,
- name="fc_out",
- activation=None,
- kernel_initializer=normc_initializer(0.01))(last_layer)
- # Adjust num_outputs to be the number of nodes in the last layer.
- else:
- self.num_outputs = (
- [int(np.product(obs_space.shape))] + hiddens[-1:])[-1]
- # Concat the log std vars to the end of the state-dependent means.
- if free_log_std and logits_out is not None:
- def tiled_log_std(x):
- return tf.tile(
- tf.expand_dims(self.log_std_var, 0), [tf.shape(x)[0], 1])
- log_std_out = tf.keras.layers.Lambda(tiled_log_std)(inputs)
- logits_out = tf.keras.layers.Concatenate(axis=1)(
- [logits_out, log_std_out])
- last_vf_layer = None
- if not vf_share_layers:
- # Build a parallel set of hidden layers for the value net.
- last_vf_layer = inputs
- i = 1
- for size in hiddens:
- last_vf_layer = tf.keras.layers.Dense(
- size,
- name="fc_value_{}".format(i),
- activation=activation,
- kernel_initializer=normc_initializer(1.0))(last_vf_layer)
- i += 1
- value_out = tf.keras.layers.Dense(
- 1,
- name="value_out",
- activation=None,
- kernel_initializer=normc_initializer(0.01))(
- last_vf_layer if last_vf_layer is not None else last_layer)
- self.base_model = tf.keras.Model(
- inputs, [(logits_out
- if logits_out is not None else last_layer), value_out])
- def forward(self, input_dict: Dict[str, TensorType],
- state: List[TensorType],
- seq_lens: TensorType) -> (TensorType, List[TensorType]):
- model_out, self._value_out = self.base_model(input_dict["obs_flat"])
- return model_out, state
- def value_function(self) -> TensorType:
- return tf.reshape(self._value_out, [-1])
- class Keras_FullyConnectedNetwork(tf.keras.Model if tf else object):
- """Generic fully connected network implemented in tf Keras."""
- def __init__(
- self,
- input_space: gym.spaces.Space,
- action_space: gym.spaces.Space,
- num_outputs: Optional[int] = None,
- *,
- name: str = "",
- fcnet_hiddens: Optional[Sequence[int]] = (),
- fcnet_activation: Optional[str] = None,
- post_fcnet_hiddens: Optional[Sequence[int]] = (),
- post_fcnet_activation: Optional[str] = None,
- no_final_linear: bool = False,
- vf_share_layers: bool = False,
- free_log_std: bool = False,
- **kwargs,
- ):
- super().__init__(name=name)
- hiddens = list(fcnet_hiddens or ()) + \
- list(post_fcnet_hiddens or ())
- activation = fcnet_activation
- if not fcnet_hiddens:
- activation = post_fcnet_activation
- activation = get_activation_fn(activation)
- # Generate free-floating bias variables for the second half of
- # the outputs.
- if free_log_std:
- assert num_outputs % 2 == 0, (
- "num_outputs must be divisible by two", num_outputs)
- num_outputs = num_outputs // 2
- self.log_std_var = tf.Variable(
- [0.0] * num_outputs, dtype=tf.float32, name="log_std")
- # We are using obs_flat, so take the flattened shape as input.
- inputs = tf.keras.layers.Input(
- shape=(int(np.product(input_space.shape)), ), name="observations")
- # Last hidden layer output (before logits outputs).
- last_layer = inputs
- # The action distribution outputs.
- logits_out = None
- i = 1
- # Create layers 0 to second-last.
- for size in hiddens[:-1]:
- last_layer = tf.keras.layers.Dense(
- size,
- name="fc_{}".format(i),
- activation=activation,
- kernel_initializer=normc_initializer(1.0))(last_layer)
- i += 1
- # The last layer is adjusted to be of size num_outputs, but it's a
- # layer with activation.
- if no_final_linear and num_outputs:
- logits_out = tf.keras.layers.Dense(
- num_outputs,
- name="fc_out",
- activation=activation,
- kernel_initializer=normc_initializer(1.0))(last_layer)
- # Finish the layers with the provided sizes (`hiddens`), plus -
- # iff num_outputs > 0 - a last linear layer of size num_outputs.
- else:
- if len(hiddens) > 0:
- last_layer = tf.keras.layers.Dense(
- hiddens[-1],
- name="fc_{}".format(i),
- activation=activation,
- kernel_initializer=normc_initializer(1.0))(last_layer)
- if num_outputs:
- logits_out = tf.keras.layers.Dense(
- num_outputs,
- name="fc_out",
- activation=None,
- kernel_initializer=normc_initializer(0.01))(last_layer)
- # Concat the log std vars to the end of the state-dependent means.
- if free_log_std and logits_out is not None:
- def tiled_log_std(x):
- return tf.tile(
- tf.expand_dims(self.log_std_var, 0), [tf.shape(x)[0], 1])
- log_std_out = tf.keras.layers.Lambda(tiled_log_std)(inputs)
- logits_out = tf.keras.layers.Concatenate(axis=1)(
- [logits_out, log_std_out])
- last_vf_layer = None
- if not vf_share_layers:
- # Build a parallel set of hidden layers for the value net.
- last_vf_layer = inputs
- i = 1
- for size in hiddens:
- last_vf_layer = tf.keras.layers.Dense(
- size,
- name="fc_value_{}".format(i),
- activation=activation,
- kernel_initializer=normc_initializer(1.0))(last_vf_layer)
- i += 1
- value_out = tf.keras.layers.Dense(
- 1,
- name="value_out",
- activation=None,
- kernel_initializer=normc_initializer(0.01))(
- last_vf_layer if last_vf_layer is not None else last_layer)
- self.base_model = tf.keras.Model(
- inputs, [(logits_out
- if logits_out is not None else last_layer), value_out])
- def call(self, input_dict: SampleBatch) -> \
- (TensorType, List[TensorType], Dict[str, TensorType]):
- model_out, value_out = self.base_model(input_dict[SampleBatch.OBS])
- extra_outs = {SampleBatch.VF_PREDS: tf.reshape(value_out, [-1])}
- return model_out, [], extra_outs
|