123456789101112131415161718192021222324252627282930313233343536373839404142434445464748495051525354555657585960616263646566676869707172737475767778798081828384858687888990919293949596979899100101102103104105106107108109110111112113114115116117118119120121122123124125126127128129130131132133134135136137138139140141142143144145146147148149150151152153154155156157158159160161162163 |
- from collections import namedtuple
- import logging
- import numpy as np
- from ray.rllib.policy.sample_batch import MultiAgentBatch, SampleBatch
- from ray.rllib.policy import Policy
- from ray.rllib.utils.annotations import DeveloperAPI
- from ray.rllib.offline.io_context import IOContext
- from ray.rllib.utils.annotations import Deprecated
- from ray.rllib.utils.numpy import convert_to_numpy
- from ray.rllib.utils.typing import TensorType, SampleBatchType
- from typing import List
- logger = logging.getLogger(__name__)
- OffPolicyEstimate = namedtuple("OffPolicyEstimate",
- ["estimator_name", "metrics"])
- @DeveloperAPI
- class OffPolicyEstimator:
- """Interface for an off policy reward estimator."""
- @DeveloperAPI
- def __init__(self, policy: Policy, gamma: float):
- """Initializes an OffPolicyEstimator instance.
- Args:
- policy: Policy to evaluate.
- gamma: Discount factor of the environment.
- """
- self.policy = policy
- self.gamma = gamma
- self.new_estimates = []
- @classmethod
- def create_from_io_context(cls, ioctx: IOContext) -> "OffPolicyEstimator":
- """Creates an off-policy estimator from an IOContext object.
- Extracts Policy and gamma (discount factor) information from the
- IOContext.
- Args:
- ioctx: The IOContext object to create the OffPolicyEstimator
- from.
- Returns:
- The OffPolicyEstimator object created from the IOContext object.
- """
- gamma = ioctx.worker.policy_config["gamma"]
- # Grab a reference to the current model
- keys = list(ioctx.worker.policy_map.keys())
- if len(keys) > 1:
- raise NotImplementedError(
- "Off-policy estimation is not implemented for multi-agent. "
- "You can set `input_evaluation: []` to resolve this.")
- policy = ioctx.worker.get_policy(keys[0])
- return cls(policy, gamma)
- @DeveloperAPI
- def estimate(self, batch: SampleBatchType) -> OffPolicyEstimate:
- """Returns an off policy estimate for the given batch of experiences.
- The batch will at most only contain data from one episode,
- but it may also only be a fragment of an episode.
- Args:
- batch: The batch to calculate the off policy estimate (OPE) on.
- Returns:
- The off-policy estimates (OPE) calculated on the given batch.
- """
- raise NotImplementedError
- @DeveloperAPI
- def action_log_likelihood(self, batch: SampleBatchType) -> TensorType:
- """Returns log likelihoods for actions in given batch for policy.
- Computes likelihoods by passing the observations through the current
- policy's `compute_log_likelihoods()` method.
- Args:
- batch: The SampleBatch or MultiAgentBatch to calculate action
- log likelihoods from. This batch/batches must contain OBS
- and ACTIONS keys.
- Returns:
- The log likelihoods of the actions in the batch, given the
- observations and the policy.
- """
- num_state_inputs = 0
- for k in batch.keys():
- if k.startswith("state_in_"):
- num_state_inputs += 1
- state_keys = ["state_in_{}".format(i) for i in range(num_state_inputs)]
- log_likelihoods: TensorType = self.policy.compute_log_likelihoods(
- actions=batch[SampleBatch.ACTIONS],
- obs_batch=batch[SampleBatch.OBS],
- state_batches=[batch[k] for k in state_keys],
- prev_action_batch=batch.get(SampleBatch.PREV_ACTIONS),
- prev_reward_batch=batch.get(SampleBatch.PREV_REWARDS),
- actions_normalized=True,
- )
- log_likelihoods = convert_to_numpy(log_likelihoods)
- return np.exp(log_likelihoods)
- @DeveloperAPI
- def process(self, batch: SampleBatchType) -> None:
- """Computes off policy estimates (OPE) on batch and stores results.
- Thus-far collected results can be retrieved then by calling
- `self.get_metrics` (which flushes the internal results storage).
- Args:
- batch: The batch to process (call `self.estimate()` on) and
- store results (OPEs) for.
- """
- self.new_estimates.append(self.estimate(batch))
- @DeveloperAPI
- def check_can_estimate_for(self, batch: SampleBatchType) -> None:
- """Checks if we support off policy estimation (OPE) on given batch.
- Args:
- batch: The batch to check.
- Raises:
- ValueError: In case `action_prob` key is not in batch OR batch
- is a MultiAgentBatch.
- """
- if isinstance(batch, MultiAgentBatch):
- raise ValueError(
- "IS-estimation is not implemented for multi-agent batches. "
- "You can set `input_evaluation: []` to resolve this.")
- if "action_prob" not in batch:
- raise ValueError(
- "Off-policy estimation is not possible unless the inputs "
- "include action probabilities (i.e., the policy is stochastic "
- "and emits the 'action_prob' key). For DQN this means using "
- "`exploration_config: {type: 'SoftQ'}`. You can also set "
- "`input_evaluation: []` to disable estimation.")
- @DeveloperAPI
- def get_metrics(self) -> List[OffPolicyEstimate]:
- """Returns list of new episode metric estimates since the last call.
- Returns:
- List of OffPolicyEstimate objects.
- """
- out = self.new_estimates
- self.new_estimates = []
- return out
- @Deprecated(new="OffPolicyEstimator.create_from_io_context", error=False)
- def create(self, *args, **kwargs):
- return self.create_from_io_context(*args, **kwargs)
- @Deprecated(new="OffPolicyEstimator.action_log_likelihood", error=False)
- def action_prob(self, *args, **kwargs):
- return self.action_log_likelihood(*args, **kwargs)
|