123456789101112131415161718192021222324252627282930313233343536373839404142434445464748495051525354555657585960616263646566676869707172737475767778798081828384858687888990919293949596979899100101102103104105106107108109110111112113114115116117118119120121122123124125126127128129130131132133134135136137138139140141142143144145146147148149150151152153154155156157158159160161162163164165166167168169170171172173174175176177178179180181182183184185186187188189190191192193194195196197198199200201202203204205206207208209210211212213214215216217218219220221222223224225226227228229230231232233234235236237238239240241242243244245246247248249250251252253254255256257258259260261262263264265266267268269270271272273274275276277278279280281282283284285286287288289290291292293294295296297298299300301302303304305306307308309310311312313314 |
- ##########
- # Contribution by the Center on Long-Term Risk:
- # https://github.com/longtermrisk/marltoolbox
- # Some parts are originally from:
- # https://github.com/alshedivat/lola/tree/master/lola
- ##########
- import logging
- from abc import ABC
- from collections import Iterable
- from typing import Dict, Optional
- import numpy as np
- from gymnasium.spaces import Discrete
- from gymnasium.utils import seeding
- from ray.rllib.env.multi_agent_env import MultiAgentEnv
- from ray.rllib.examples.env.utils.interfaces import InfoAccumulationInterface
- from ray.rllib.examples.env.utils.mixins import (
- TwoPlayersTwoActionsInfoMixin,
- NPlayersNDiscreteActionsInfoMixin,
- )
- logger = logging.getLogger(__name__)
- class MatrixSequentialSocialDilemma(InfoAccumulationInterface, MultiAgentEnv, ABC):
- """
- A multi-agent abstract class for two player matrix games.
- PAYOUT_MATRIX: Numpy array. Along the dimension N, the action of the
- Nth player change. The last dimension is used to select the player
- whose reward you want to know.
- max_steps: number of step in one episode
- players_ids: list of the RLlib agent id of each player
- output_additional_info: ask the environment to aggregate information
- about the last episode and output them as info at the end of the
- episode.
- """
- def __init__(self, config: Optional[Dict] = None):
- if config is None:
- config = {}
- assert "reward_randomness" not in config.keys()
- assert self.PAYOUT_MATRIX is not None
- if "players_ids" in config:
- assert (
- isinstance(config["players_ids"], Iterable)
- and len(config["players_ids"]) == self.NUM_AGENTS
- )
- self.players_ids = config.get("players_ids", ["player_row", "player_col"])
- self.player_row_id, self.player_col_id = self.players_ids
- self.max_steps = config.get("max_steps", 20)
- self.output_additional_info = config.get("output_additional_info", True)
- self.step_count_in_current_episode = None
- # To store info about the fraction of each states
- if self.output_additional_info:
- self._init_info()
- def reset(self, *, seed=None, options=None):
- self.np_random, seed = seeding.np_random(seed)
- self.step_count_in_current_episode = 0
- if self.output_additional_info:
- self._reset_info()
- return {
- self.player_row_id: self.NUM_STATES - 1,
- self.player_col_id: self.NUM_STATES - 1,
- }, {}
- def step(self, actions: dict):
- """
- :param actions: Dict containing both actions for player_1 and player_2
- :return: observations, rewards, done, info
- """
- self.step_count_in_current_episode += 1
- action_player_row = actions[self.player_row_id]
- action_player_col = actions[self.player_col_id]
- if self.output_additional_info:
- self._accumulate_info(action_player_row, action_player_col)
- observations = self._produce_observations_invariant_to_the_player_trained(
- action_player_row, action_player_col
- )
- rewards = self._get_players_rewards(action_player_row, action_player_col)
- epi_is_done = self.step_count_in_current_episode >= self.max_steps
- if self.step_count_in_current_episode > self.max_steps:
- logger.warning("self.step_count_in_current_episode >= self.max_steps")
- info = self._get_info_for_current_epi(epi_is_done)
- return self._to_RLlib_API(observations, rewards, epi_is_done, info)
- def _produce_observations_invariant_to_the_player_trained(
- self, action_player_0: int, action_player_1: int
- ):
- """
- We want to be able to use a policy trained as player 1
- for evaluation as player 2 and vice versa.
- """
- return [
- action_player_0 * self.NUM_ACTIONS + action_player_1,
- action_player_1 * self.NUM_ACTIONS + action_player_0,
- ]
- def _get_players_rewards(self, action_player_0: int, action_player_1: int):
- return [
- self.PAYOUT_MATRIX[action_player_0][action_player_1][0],
- self.PAYOUT_MATRIX[action_player_0][action_player_1][1],
- ]
- def _to_RLlib_API(
- self, observations: list, rewards: list, epi_is_done: bool, info: dict
- ):
- observations = {
- self.player_row_id: observations[0],
- self.player_col_id: observations[1],
- }
- rewards = {self.player_row_id: rewards[0], self.player_col_id: rewards[1]}
- if info is None:
- info = {}
- else:
- info = {self.player_row_id: info, self.player_col_id: info}
- done = {
- self.player_row_id: epi_is_done,
- self.player_col_id: epi_is_done,
- "__all__": epi_is_done,
- }
- return observations, rewards, done, done, info
- def _get_info_for_current_epi(self, epi_is_done):
- if epi_is_done and self.output_additional_info:
- info_for_current_epi = self._get_episode_info()
- else:
- info_for_current_epi = None
- return info_for_current_epi
- def __str__(self):
- return self.NAME
- class IteratedMatchingPennies(
- TwoPlayersTwoActionsInfoMixin, MatrixSequentialSocialDilemma
- ):
- """
- A two-agent environment for the Matching Pennies game.
- """
- NUM_AGENTS = 2
- NUM_ACTIONS = 2
- NUM_STATES = NUM_ACTIONS**NUM_AGENTS + 1
- ACTION_SPACE = Discrete(NUM_ACTIONS)
- OBSERVATION_SPACE = Discrete(NUM_STATES)
- PAYOUT_MATRIX = np.array([[[+1, -1], [-1, +1]], [[-1, +1], [+1, -1]]])
- NAME = "IMP"
- class IteratedPrisonersDilemma(
- TwoPlayersTwoActionsInfoMixin, MatrixSequentialSocialDilemma
- ):
- """
- A two-agent environment for the Prisoner's Dilemma game.
- """
- NUM_AGENTS = 2
- NUM_ACTIONS = 2
- NUM_STATES = NUM_ACTIONS**NUM_AGENTS + 1
- ACTION_SPACE = Discrete(NUM_ACTIONS)
- OBSERVATION_SPACE = Discrete(NUM_STATES)
- PAYOUT_MATRIX = np.array([[[-1, -1], [-3, +0]], [[+0, -3], [-2, -2]]])
- NAME = "IPD"
- class IteratedAsymPrisonersDilemma(
- TwoPlayersTwoActionsInfoMixin, MatrixSequentialSocialDilemma
- ):
- """
- A two-agent environment for the Asymmetric Prisoner's Dilemma game.
- """
- NUM_AGENTS = 2
- NUM_ACTIONS = 2
- NUM_STATES = NUM_ACTIONS**NUM_AGENTS + 1
- ACTION_SPACE = Discrete(NUM_ACTIONS)
- OBSERVATION_SPACE = Discrete(NUM_STATES)
- PAYOUT_MATRIX = np.array([[[+0, -1], [-3, +0]], [[+0, -3], [-2, -2]]])
- NAME = "IPD"
- class IteratedStagHunt(TwoPlayersTwoActionsInfoMixin, MatrixSequentialSocialDilemma):
- """
- A two-agent environment for the Stag Hunt game.
- """
- NUM_AGENTS = 2
- NUM_ACTIONS = 2
- NUM_STATES = NUM_ACTIONS**NUM_AGENTS + 1
- ACTION_SPACE = Discrete(NUM_ACTIONS)
- OBSERVATION_SPACE = Discrete(NUM_STATES)
- PAYOUT_MATRIX = np.array([[[3, 3], [0, 2]], [[2, 0], [1, 1]]])
- NAME = "IteratedStagHunt"
- class IteratedChicken(TwoPlayersTwoActionsInfoMixin, MatrixSequentialSocialDilemma):
- """
- A two-agent environment for the Chicken game.
- """
- NUM_AGENTS = 2
- NUM_ACTIONS = 2
- NUM_STATES = NUM_ACTIONS**NUM_AGENTS + 1
- ACTION_SPACE = Discrete(NUM_ACTIONS)
- OBSERVATION_SPACE = Discrete(NUM_STATES)
- PAYOUT_MATRIX = np.array([[[+0, +0], [-1.0, +1.0]], [[+1, -1], [-10, -10]]])
- NAME = "IteratedChicken"
- class IteratedAsymChicken(TwoPlayersTwoActionsInfoMixin, MatrixSequentialSocialDilemma):
- """
- A two-agent environment for the Asymmetric Chicken game.
- """
- NUM_AGENTS = 2
- NUM_ACTIONS = 2
- NUM_STATES = NUM_ACTIONS**NUM_AGENTS + 1
- ACTION_SPACE = Discrete(NUM_ACTIONS)
- OBSERVATION_SPACE = Discrete(NUM_STATES)
- PAYOUT_MATRIX = np.array([[[+2.0, +0], [-1.0, +1.0]], [[+2.5, -1], [-10, -10]]])
- NAME = "AsymmetricIteratedChicken"
- class IteratedBoS(TwoPlayersTwoActionsInfoMixin, MatrixSequentialSocialDilemma):
- """
- A two-agent environment for the BoS game.
- """
- NUM_AGENTS = 2
- NUM_ACTIONS = 2
- NUM_STATES = NUM_ACTIONS**NUM_AGENTS + 1
- ACTION_SPACE = Discrete(NUM_ACTIONS)
- OBSERVATION_SPACE = Discrete(NUM_STATES)
- PAYOUT_MATRIX = np.array(
- [[[+3.0, +2.0], [+0.0, +0.0]], [[+0.0, +0.0], [+2.0, +3.0]]]
- )
- NAME = "IteratedBoS"
- class IteratedAsymBoS(TwoPlayersTwoActionsInfoMixin, MatrixSequentialSocialDilemma):
- """
- A two-agent environment for the BoS game.
- """
- NUM_AGENTS = 2
- NUM_ACTIONS = 2
- NUM_STATES = NUM_ACTIONS**NUM_AGENTS + 1
- ACTION_SPACE = Discrete(NUM_ACTIONS)
- OBSERVATION_SPACE = Discrete(NUM_STATES)
- PAYOUT_MATRIX = np.array(
- [[[+4.0, +1.0], [+0.0, +0.0]], [[+0.0, +0.0], [+2.0, +2.0]]]
- )
- NAME = "AsymmetricIteratedBoS"
- def define_greed_fear_matrix_game(greed, fear):
- class GreedFearGame(TwoPlayersTwoActionsInfoMixin, MatrixSequentialSocialDilemma):
- NUM_AGENTS = 2
- NUM_ACTIONS = 2
- NUM_STATES = NUM_ACTIONS**NUM_AGENTS + 1
- ACTION_SPACE = Discrete(NUM_ACTIONS)
- OBSERVATION_SPACE = Discrete(NUM_STATES)
- R = 3
- P = 1
- T = R + greed
- S = P - fear
- PAYOUT_MATRIX = np.array([[[R, R], [S, T]], [[T, S], [P, P]]])
- NAME = "IteratedGreedFear"
- def __str__(self):
- return f"{self.NAME} with greed={greed} and fear={fear}"
- return GreedFearGame
- class IteratedBoSAndPD(
- NPlayersNDiscreteActionsInfoMixin, MatrixSequentialSocialDilemma
- ):
- """
- A two-agent environment for the BOTS + PD game.
- """
- NUM_AGENTS = 2
- NUM_ACTIONS = 3
- NUM_STATES = NUM_ACTIONS**NUM_AGENTS + 1
- ACTION_SPACE = Discrete(NUM_ACTIONS)
- OBSERVATION_SPACE = Discrete(NUM_STATES)
- PAYOUT_MATRIX = np.array(
- [
- [[3.5, +1], [+0, +0], [-3, +2]],
- [[+0.0, +0], [+1, +3], [-3, +2]],
- [[+2.0, -3], [+2, -3], [-1, -1]],
- ]
- )
- NAME = "IteratedBoSAndPD"
|