off_policy_estimator.py 5.9 KB

123456789101112131415161718192021222324252627282930313233343536373839404142434445464748495051525354555657585960616263646566676869707172737475767778798081828384858687888990919293949596979899100101102103104105106107108109110111112113114115116117118119120121122123124125126127128129130131132133134135136137138139140141142143144145146147148149150151152153154155156157158159160161162163
  1. from collections import namedtuple
  2. import logging
  3. import numpy as np
  4. from ray.rllib.policy.sample_batch import MultiAgentBatch, SampleBatch
  5. from ray.rllib.policy import Policy
  6. from ray.rllib.utils.annotations import DeveloperAPI
  7. from ray.rllib.offline.io_context import IOContext
  8. from ray.rllib.utils.annotations import Deprecated
  9. from ray.rllib.utils.numpy import convert_to_numpy
  10. from ray.rllib.utils.typing import TensorType, SampleBatchType
  11. from typing import List
  12. logger = logging.getLogger(__name__)
  13. OffPolicyEstimate = namedtuple("OffPolicyEstimate",
  14. ["estimator_name", "metrics"])
  15. @DeveloperAPI
  16. class OffPolicyEstimator:
  17. """Interface for an off policy reward estimator."""
  18. @DeveloperAPI
  19. def __init__(self, policy: Policy, gamma: float):
  20. """Initializes an OffPolicyEstimator instance.
  21. Args:
  22. policy: Policy to evaluate.
  23. gamma: Discount factor of the environment.
  24. """
  25. self.policy = policy
  26. self.gamma = gamma
  27. self.new_estimates = []
  28. @classmethod
  29. def create_from_io_context(cls, ioctx: IOContext) -> "OffPolicyEstimator":
  30. """Creates an off-policy estimator from an IOContext object.
  31. Extracts Policy and gamma (discount factor) information from the
  32. IOContext.
  33. Args:
  34. ioctx: The IOContext object to create the OffPolicyEstimator
  35. from.
  36. Returns:
  37. The OffPolicyEstimator object created from the IOContext object.
  38. """
  39. gamma = ioctx.worker.policy_config["gamma"]
  40. # Grab a reference to the current model
  41. keys = list(ioctx.worker.policy_map.keys())
  42. if len(keys) > 1:
  43. raise NotImplementedError(
  44. "Off-policy estimation is not implemented for multi-agent. "
  45. "You can set `input_evaluation: []` to resolve this.")
  46. policy = ioctx.worker.get_policy(keys[0])
  47. return cls(policy, gamma)
  48. @DeveloperAPI
  49. def estimate(self, batch: SampleBatchType) -> OffPolicyEstimate:
  50. """Returns an off policy estimate for the given batch of experiences.
  51. The batch will at most only contain data from one episode,
  52. but it may also only be a fragment of an episode.
  53. Args:
  54. batch: The batch to calculate the off policy estimate (OPE) on.
  55. Returns:
  56. The off-policy estimates (OPE) calculated on the given batch.
  57. """
  58. raise NotImplementedError
  59. @DeveloperAPI
  60. def action_log_likelihood(self, batch: SampleBatchType) -> TensorType:
  61. """Returns log likelihoods for actions in given batch for policy.
  62. Computes likelihoods by passing the observations through the current
  63. policy's `compute_log_likelihoods()` method.
  64. Args:
  65. batch: The SampleBatch or MultiAgentBatch to calculate action
  66. log likelihoods from. This batch/batches must contain OBS
  67. and ACTIONS keys.
  68. Returns:
  69. The log likelihoods of the actions in the batch, given the
  70. observations and the policy.
  71. """
  72. num_state_inputs = 0
  73. for k in batch.keys():
  74. if k.startswith("state_in_"):
  75. num_state_inputs += 1
  76. state_keys = ["state_in_{}".format(i) for i in range(num_state_inputs)]
  77. log_likelihoods: TensorType = self.policy.compute_log_likelihoods(
  78. actions=batch[SampleBatch.ACTIONS],
  79. obs_batch=batch[SampleBatch.OBS],
  80. state_batches=[batch[k] for k in state_keys],
  81. prev_action_batch=batch.get(SampleBatch.PREV_ACTIONS),
  82. prev_reward_batch=batch.get(SampleBatch.PREV_REWARDS),
  83. actions_normalized=True,
  84. )
  85. log_likelihoods = convert_to_numpy(log_likelihoods)
  86. return np.exp(log_likelihoods)
  87. @DeveloperAPI
  88. def process(self, batch: SampleBatchType) -> None:
  89. """Computes off policy estimates (OPE) on batch and stores results.
  90. Thus-far collected results can be retrieved then by calling
  91. `self.get_metrics` (which flushes the internal results storage).
  92. Args:
  93. batch: The batch to process (call `self.estimate()` on) and
  94. store results (OPEs) for.
  95. """
  96. self.new_estimates.append(self.estimate(batch))
  97. @DeveloperAPI
  98. def check_can_estimate_for(self, batch: SampleBatchType) -> None:
  99. """Checks if we support off policy estimation (OPE) on given batch.
  100. Args:
  101. batch: The batch to check.
  102. Raises:
  103. ValueError: In case `action_prob` key is not in batch OR batch
  104. is a MultiAgentBatch.
  105. """
  106. if isinstance(batch, MultiAgentBatch):
  107. raise ValueError(
  108. "IS-estimation is not implemented for multi-agent batches. "
  109. "You can set `input_evaluation: []` to resolve this.")
  110. if "action_prob" not in batch:
  111. raise ValueError(
  112. "Off-policy estimation is not possible unless the inputs "
  113. "include action probabilities (i.e., the policy is stochastic "
  114. "and emits the 'action_prob' key). For DQN this means using "
  115. "`exploration_config: {type: 'SoftQ'}`. You can also set "
  116. "`input_evaluation: []` to disable estimation.")
  117. @DeveloperAPI
  118. def get_metrics(self) -> List[OffPolicyEstimate]:
  119. """Returns list of new episode metric estimates since the last call.
  120. Returns:
  121. List of OffPolicyEstimate objects.
  122. """
  123. out = self.new_estimates
  124. self.new_estimates = []
  125. return out
  126. @Deprecated(new="OffPolicyEstimator.create_from_io_context", error=False)
  127. def create(self, *args, **kwargs):
  128. return self.create_from_io_context(*args, **kwargs)
  129. @Deprecated(new="OffPolicyEstimator.action_log_likelihood", error=False)
  130. def action_prob(self, *args, **kwargs):
  131. return self.action_log_likelihood(*args, **kwargs)