123456789101112131415161718192021222324252627282930313233343536373839404142434445464748495051525354555657585960616263646566676869707172737475767778798081828384858687888990919293949596979899100101102103104105106107108109110111112113114115116117118119120121122123124125126127128129130131132133134135136137138139140141142143144145146147148149150151152153154155156157158159160161162163164165166167168169170171172173174175176177178179180181182183184185186187188189190191192193194195196197198199200201202203204205206207208209210211212213214215216217218219220221222223224225226227228229230231232233234235236237238239240241242243244245246247248249250251252253254255256257258259260261262263264265266267268269270271272273274275276277278279280281282283284285286287288289290291292293294295296297298299300301302303304305306307308309310311312313314315316317318319320321322323324325326327328329330331332333334335336337338339340341342343344345346347348349350351352353354355356357358359360361362363364365366367368369370371372373374375376377378379380381382383384385386 |
- ##########
- # Contribution by the Center on Long-Term Risk:
- # https://github.com/longtermrisk/marltoolbox
- # Some parts are originally from:
- # https://github.com/julianstastny/openspiel-social-dilemmas/
- # blob/master/games/coin_game_gym.py
- ##########
- import copy
- from collections import Iterable
- import numpy as np
- from numba import jit, prange
- from numba.typed import List
- from ray.rllib.examples.env.coin_game_non_vectorized_env import CoinGame
- from ray.rllib.utils import override
- class VectorizedCoinGame(CoinGame):
- """
- Vectorized Coin Game environment.
- """
- def __init__(self, config=None):
- if config is None:
- config = {}
- super().__init__(config)
- self.batch_size = config.get("batch_size", 1)
- self.force_vectorized = config.get("force_vectorize", False)
- assert self.grid_size == 3, "hardcoded in the generate_state function"
- @override(CoinGame)
- def _randomize_color_and_player_positions(self):
- # Reset coin color and the players and coin positions
- self.red_coin = np.random.randint(2, size=self.batch_size)
- self.red_pos = np.random.randint(self.grid_size, size=(self.batch_size, 2))
- self.blue_pos = np.random.randint(self.grid_size, size=(self.batch_size, 2))
- self.coin_pos = np.zeros((self.batch_size, 2), dtype=np.int8)
- self._players_do_not_overlap_at_start()
- @override(CoinGame)
- def _players_do_not_overlap_at_start(self):
- for i in range(self.batch_size):
- while _same_pos(self.red_pos[i], self.blue_pos[i]):
- self.blue_pos[i] = np.random.randint(self.grid_size, size=2)
- @override(CoinGame)
- def _generate_coin(self):
- generate = np.ones(self.batch_size, dtype=bool)
- self.coin_pos = generate_coin(
- self.batch_size,
- generate,
- self.red_coin,
- self.red_pos,
- self.blue_pos,
- self.coin_pos,
- self.grid_size,
- )
- @override(CoinGame)
- def _generate_observation(self):
- obs = generate_observations_wt_numba_optimization(
- self.batch_size,
- self.red_pos,
- self.blue_pos,
- self.coin_pos,
- self.red_coin,
- self.grid_size,
- )
- obs = self._get_obs_invariant_to_the_player_trained(obs)
- obs, _ = self._optional_unvectorize(obs)
- return obs
- def _optional_unvectorize(self, obs, rewards=None):
- if self.batch_size == 1 and not self.force_vectorized:
- obs = [one_obs[0, ...] for one_obs in obs]
- if rewards is not None:
- rewards[0], rewards[1] = rewards[0][0], rewards[1][0]
- return obs, rewards
- @override(CoinGame)
- def step(self, actions: Iterable):
- actions = self._from_RLlib_API_to_list(actions)
- self.step_count_in_current_episode += 1
- (
- self.red_pos,
- self.blue_pos,
- rewards,
- self.coin_pos,
- observation,
- self.red_coin,
- red_pick_any,
- red_pick_red,
- blue_pick_any,
- blue_pick_blue,
- ) = vectorized_step_wt_numba_optimization(
- actions,
- self.batch_size,
- self.red_pos,
- self.blue_pos,
- self.coin_pos,
- self.red_coin,
- self.grid_size,
- self.asymmetric,
- self.max_steps,
- self.both_players_can_pick_the_same_coin,
- )
- if self.output_additional_info:
- self._accumulate_info(
- red_pick_any, red_pick_red, blue_pick_any, blue_pick_blue
- )
- obs = self._get_obs_invariant_to_the_player_trained(observation)
- obs, rewards = self._optional_unvectorize(obs, rewards)
- return self._to_RLlib_API(obs, rewards)
- @override(CoinGame)
- def _get_episode_info(self):
- player_red_info, player_blue_info = {}, {}
- if len(self.red_pick) > 0:
- red_pick = sum(self.red_pick)
- player_red_info["pick_speed"] = red_pick / (
- len(self.red_pick) * self.batch_size
- )
- if red_pick > 0:
- player_red_info["pick_own_color"] = sum(self.red_pick_own) / red_pick
- if len(self.blue_pick) > 0:
- blue_pick = sum(self.blue_pick)
- player_blue_info["pick_speed"] = blue_pick / (
- len(self.blue_pick) * self.batch_size
- )
- if blue_pick > 0:
- player_blue_info["pick_own_color"] = sum(self.blue_pick_own) / blue_pick
- return player_red_info, player_blue_info
- @override(CoinGame)
- def _from_RLlib_API_to_list(self, actions):
- ac_red = actions[self.player_red_id]
- ac_blue = actions[self.player_blue_id]
- if not isinstance(ac_red, Iterable):
- assert not isinstance(ac_blue, Iterable)
- ac_red, ac_blue = [ac_red], [ac_blue]
- actions = [ac_red, ac_blue]
- actions = np.array(actions).T
- return actions
- def _save_env(self):
- env_save_state = {
- "red_pos": self.red_pos,
- "blue_pos": self.blue_pos,
- "coin_pos": self.coin_pos,
- "red_coin": self.red_coin,
- "grid_size": self.grid_size,
- "asymmetric": self.asymmetric,
- "batch_size": self.batch_size,
- "step_count_in_current_episode": self.step_count_in_current_episode,
- "max_steps": self.max_steps,
- "red_pick": self.red_pick,
- "red_pick_own": self.red_pick_own,
- "blue_pick": self.blue_pick,
- "blue_pick_own": self.blue_pick_own,
- "both_players_can_pick_the_same_coin": self.both_players_can_pick_the_same_coin, # noqa: E501
- }
- return copy.deepcopy(env_save_state)
- def _load_env(self, env_state):
- for k, v in env_state.items():
- self.__setattr__(k, v)
- class AsymVectorizedCoinGame(VectorizedCoinGame):
- NAME = "AsymCoinGame"
- def __init__(self, config=None):
- if config is None:
- config = {}
- if "asymmetric" in config:
- assert config["asymmetric"]
- else:
- config["asymmetric"] = True
- super().__init__(config)
- @jit(nopython=True)
- def move_players(batch_size, actions, red_pos, blue_pos, grid_size):
- moves = List(
- [
- np.array([0, 1]),
- np.array([0, -1]),
- np.array([1, 0]),
- np.array([-1, 0]),
- ]
- )
- for j in prange(batch_size):
- red_pos[j] = (red_pos[j] + moves[actions[j, 0]]) % grid_size
- blue_pos[j] = (blue_pos[j] + moves[actions[j, 1]]) % grid_size
- return red_pos, blue_pos
- @jit(nopython=True)
- def compute_reward(
- batch_size,
- red_pos,
- blue_pos,
- coin_pos,
- red_coin,
- asymmetric,
- both_players_can_pick_the_same_coin,
- ):
- reward_red = np.zeros(batch_size)
- reward_blue = np.zeros(batch_size)
- generate = np.zeros(batch_size, dtype=np.bool_)
- red_pick_any, red_pick_red, blue_pick_any, blue_pick_blue = 0, 0, 0, 0
- for i in prange(batch_size):
- red_first_if_both = None
- if not both_players_can_pick_the_same_coin:
- if _same_pos(red_pos[i], coin_pos[i]) and _same_pos(
- blue_pos[i], coin_pos[i]
- ):
- red_first_if_both = bool(np.random.randint(0, 1))
- if red_coin[i]:
- if _same_pos(red_pos[i], coin_pos[i]) and (
- red_first_if_both is None or red_first_if_both
- ):
- generate[i] = True
- reward_red[i] += 1
- if asymmetric:
- reward_red[i] += 3
- red_pick_any += 1
- red_pick_red += 1
- if _same_pos(blue_pos[i], coin_pos[i]) and (
- red_first_if_both is None or not red_first_if_both
- ):
- generate[i] = True
- reward_red[i] += -2
- reward_blue[i] += 1
- blue_pick_any += 1
- else:
- if _same_pos(red_pos[i], coin_pos[i]) and (
- red_first_if_both is None or red_first_if_both
- ):
- generate[i] = True
- reward_red[i] += 1
- reward_blue[i] += -2
- if asymmetric:
- reward_red[i] += 3
- red_pick_any += 1
- if _same_pos(blue_pos[i], coin_pos[i]) and (
- red_first_if_both is None or not red_first_if_both
- ):
- generate[i] = True
- reward_blue[i] += 1
- blue_pick_any += 1
- blue_pick_blue += 1
- reward = [reward_red, reward_blue]
- return reward, generate, red_pick_any, red_pick_red, blue_pick_any, blue_pick_blue
- @jit(nopython=True)
- def _same_pos(x, y):
- return (x == y).all()
- @jit(nopython=True)
- def _flatten_index(pos, grid_size):
- y_pos, x_pos = pos
- idx = grid_size * y_pos
- idx += x_pos
- return idx
- @jit(nopython=True)
- def _unflatten_index(pos, grid_size):
- x_idx = pos % grid_size
- y_idx = pos // grid_size
- return np.array([y_idx, x_idx])
- @jit(nopython=True)
- def generate_coin(
- batch_size, generate, red_coin, red_pos, blue_pos, coin_pos, grid_size
- ):
- red_coin[generate] = 1 - red_coin[generate]
- for i in prange(batch_size):
- if generate[i]:
- coin_pos[i] = place_coin(red_pos[i], blue_pos[i], grid_size)
- return coin_pos
- @jit(nopython=True)
- def place_coin(red_pos_i, blue_pos_i, grid_size):
- red_pos_flat = _flatten_index(red_pos_i, grid_size)
- blue_pos_flat = _flatten_index(blue_pos_i, grid_size)
- possible_coin_pos = np.array(
- [x for x in range(9) if ((x != blue_pos_flat) and (x != red_pos_flat))]
- )
- flat_coin_pos = np.random.choice(possible_coin_pos)
- return _unflatten_index(flat_coin_pos, grid_size)
- @jit(nopython=True)
- def generate_observations_wt_numba_optimization(
- batch_size, red_pos, blue_pos, coin_pos, red_coin, grid_size
- ):
- obs = np.zeros((batch_size, grid_size, grid_size, 4))
- for i in prange(batch_size):
- obs[i, red_pos[i][0], red_pos[i][1], 0] = 1
- obs[i, blue_pos[i][0], blue_pos[i][1], 1] = 1
- if red_coin[i]:
- obs[i, coin_pos[i][0], coin_pos[i][1], 2] = 1
- else:
- obs[i, coin_pos[i][0], coin_pos[i][1], 3] = 1
- return obs
- @jit(nopython=True)
- def vectorized_step_wt_numba_optimization(
- actions,
- batch_size,
- red_pos,
- blue_pos,
- coin_pos,
- red_coin,
- grid_size: int,
- asymmetric: bool,
- max_steps: int,
- both_players_can_pick_the_same_coin: bool,
- ):
- red_pos, blue_pos = move_players(batch_size, actions, red_pos, blue_pos, grid_size)
- (
- reward,
- generate,
- red_pick_any,
- red_pick_red,
- blue_pick_any,
- blue_pick_blue,
- ) = compute_reward(
- batch_size,
- red_pos,
- blue_pos,
- coin_pos,
- red_coin,
- asymmetric,
- both_players_can_pick_the_same_coin,
- )
- coin_pos = generate_coin(
- batch_size, generate, red_coin, red_pos, blue_pos, coin_pos, grid_size
- )
- obs = generate_observations_wt_numba_optimization(
- batch_size, red_pos, blue_pos, coin_pos, red_coin, grid_size
- )
- return (
- red_pos,
- blue_pos,
- reward,
- coin_pos,
- obs,
- red_coin,
- red_pick_any,
- red_pick_red,
- blue_pick_any,
- blue_pick_blue,
- )
|