misc.py 7.1 KB

123456789101112131415161718192021222324252627282930313233343536373839404142434445464748495051525354555657585960616263646566676869707172737475767778798081828384858687888990919293949596979899100101102103104105106107108109110111112113114115116117118119120121122123124125126127128129130131132133134135136137138139140141142143144145146147148149150151152153154155156157158159160161162163164165166167168169170171172173174175176177178179180181182183184185186187
  1. """ Code adapted from https://github.com/ikostrikov/pytorch-a3c"""
  2. import numpy as np
  3. from typing import Union, Tuple, Any, List
  4. from ray.rllib.models.utils import get_activation_fn
  5. from ray.rllib.utils.framework import try_import_torch
  6. from ray.rllib.utils.typing import TensorType
  7. torch, nn = try_import_torch()
  8. def normc_initializer(std: float = 1.0) -> Any:
  9. def initializer(tensor):
  10. tensor.data.normal_(0, 1)
  11. tensor.data *= std / torch.sqrt(
  12. tensor.data.pow(2).sum(1, keepdim=True))
  13. return initializer
  14. def same_padding(in_size: Tuple[int, int], filter_size: Tuple[int, int],
  15. stride_size: Union[int, Tuple[int, int]]
  16. ) -> (Union[int, Tuple[int, int]], Tuple[int, int]):
  17. """Note: Padding is added to match TF conv2d `same` padding. See
  18. www.tensorflow.org/versions/r0.12/api_docs/python/nn/convolution
  19. Args:
  20. in_size (tuple): Rows (Height), Column (Width) for input
  21. stride_size (Union[int,Tuple[int, int]]): Rows (Height), column (Width)
  22. for stride. If int, height == width.
  23. filter_size (tuple): Rows (Height), column (Width) for filter
  24. Returns:
  25. padding (tuple): For input into torch.nn.ZeroPad2d.
  26. output (tuple): Output shape after padding and convolution.
  27. """
  28. in_height, in_width = in_size
  29. if isinstance(filter_size, int):
  30. filter_height, filter_width = filter_size, filter_size
  31. else:
  32. filter_height, filter_width = filter_size
  33. if isinstance(stride_size, (int, float)):
  34. stride_height, stride_width = int(stride_size), int(stride_size)
  35. else:
  36. stride_height, stride_width = int(stride_size[0]), int(stride_size[1])
  37. out_height = np.ceil(float(in_height) / float(stride_height))
  38. out_width = np.ceil(float(in_width) / float(stride_width))
  39. pad_along_height = int(
  40. ((out_height - 1) * stride_height + filter_height - in_height))
  41. pad_along_width = int(
  42. ((out_width - 1) * stride_width + filter_width - in_width))
  43. pad_top = pad_along_height // 2
  44. pad_bottom = pad_along_height - pad_top
  45. pad_left = pad_along_width // 2
  46. pad_right = pad_along_width - pad_left
  47. padding = (pad_left, pad_right, pad_top, pad_bottom)
  48. output = (out_height, out_width)
  49. return padding, output
  50. class SlimConv2d(nn.Module):
  51. """Simple mock of tf.slim Conv2d"""
  52. def __init__(
  53. self,
  54. in_channels: int,
  55. out_channels: int,
  56. kernel: Union[int, Tuple[int, int]],
  57. stride: Union[int, Tuple[int, int]],
  58. padding: Union[int, Tuple[int, int]],
  59. # Defaulting these to nn.[..] will break soft torch import.
  60. initializer: Any = "default",
  61. activation_fn: Any = "default",
  62. bias_init: float = 0):
  63. """Creates a standard Conv2d layer, similar to torch.nn.Conv2d
  64. Args:
  65. in_channels(int): Number of input channels
  66. out_channels (int): Number of output channels
  67. kernel (Union[int, Tuple[int, int]]): If int, the kernel is
  68. a tuple(x,x). Elsewise, the tuple can be specified
  69. stride (Union[int, Tuple[int, int]]): Controls the stride
  70. for the cross-correlation. If int, the stride is a
  71. tuple(x,x). Elsewise, the tuple can be specified
  72. padding (Union[int, Tuple[int, int]]): Controls the amount
  73. of implicit zero-paddings during the conv operation
  74. initializer (Any): Initializer function for kernel weights
  75. activation_fn (Any): Activation function at the end of layer
  76. bias_init (float): Initalize bias weights to bias_init const
  77. """
  78. super(SlimConv2d, self).__init__()
  79. layers = []
  80. # Padding layer.
  81. if padding:
  82. layers.append(nn.ZeroPad2d(padding))
  83. # Actual Conv2D layer (including correct initialization logic).
  84. conv = nn.Conv2d(in_channels, out_channels, kernel, stride)
  85. if initializer:
  86. if initializer == "default":
  87. initializer = nn.init.xavier_uniform_
  88. initializer(conv.weight)
  89. nn.init.constant_(conv.bias, bias_init)
  90. layers.append(conv)
  91. # Activation function (if any; default=ReLu).
  92. if isinstance(activation_fn, str):
  93. if activation_fn == "default":
  94. activation_fn = nn.ReLU
  95. else:
  96. activation_fn = get_activation_fn(activation_fn, "torch")
  97. if activation_fn is not None:
  98. layers.append(activation_fn())
  99. # Put everything in sequence.
  100. self._model = nn.Sequential(*layers)
  101. def forward(self, x: TensorType) -> TensorType:
  102. return self._model(x)
  103. class SlimFC(nn.Module):
  104. """Simple PyTorch version of `linear` function"""
  105. def __init__(self,
  106. in_size: int,
  107. out_size: int,
  108. initializer: Any = None,
  109. activation_fn: Any = None,
  110. use_bias: bool = True,
  111. bias_init: float = 0.0):
  112. """Creates a standard FC layer, similar to torch.nn.Linear
  113. Args:
  114. in_size(int): Input size for FC Layer
  115. out_size (int): Output size for FC Layer
  116. initializer (Any): Initializer function for FC layer weights
  117. activation_fn (Any): Activation function at the end of layer
  118. use_bias (bool): Whether to add bias weights or not
  119. bias_init (float): Initalize bias weights to bias_init const
  120. """
  121. super(SlimFC, self).__init__()
  122. layers = []
  123. # Actual nn.Linear layer (including correct initialization logic).
  124. linear = nn.Linear(in_size, out_size, bias=use_bias)
  125. if initializer is None:
  126. initializer = nn.init.xavier_uniform_
  127. initializer(linear.weight)
  128. if use_bias is True:
  129. nn.init.constant_(linear.bias, bias_init)
  130. layers.append(linear)
  131. # Activation function (if any; default=None (linear)).
  132. if isinstance(activation_fn, str):
  133. activation_fn = get_activation_fn(activation_fn, "torch")
  134. if activation_fn is not None:
  135. layers.append(activation_fn())
  136. # Put everything in sequence.
  137. self._model = nn.Sequential(*layers)
  138. def forward(self, x: TensorType) -> TensorType:
  139. return self._model(x)
  140. class AppendBiasLayer(nn.Module):
  141. """Simple bias appending layer for free_log_std."""
  142. def __init__(self, num_bias_vars: int):
  143. super().__init__()
  144. self.log_std = torch.nn.Parameter(
  145. torch.as_tensor([0.0] * num_bias_vars))
  146. self.register_parameter("log_std", self.log_std)
  147. def forward(self, x: TensorType) -> TensorType:
  148. out = torch.cat(
  149. [x, self.log_std.unsqueeze(0).repeat([len(x), 1])], axis=1)
  150. return out
  151. class Reshape(nn.Module):
  152. """Standard module that reshapes/views a tensor
  153. """
  154. def __init__(self, shape: List):
  155. super().__init__()
  156. self.shape = shape
  157. def forward(self, x):
  158. return x.view(*self.shape)