123456789101112131415161718192021222324252627282930313233343536373839404142434445464748495051525354555657585960616263646566676869707172737475767778798081828384858687888990919293949596979899100101102103104105106107108109110111112113114115116117118119120121122123124125126127128129130131132133134135136137138139140141142143144145146147148149150151152153154155156157158159160161162163164165166167168169170171172173174175176177178179180181182183184185186187188189190191192193194195196197198199200201202203204205206207208209210211212213214215216217218219220221 |
- import logging
- from six.moves import queue
- import threading
- from ray.rllib.execution.learner_thread import LearnerThread
- from ray.rllib.execution.buffers.minibatch_buffer import MinibatchBuffer
- from ray.rllib.policy.sample_batch import SampleBatch
- from ray.rllib.utils.annotations import override
- from ray.rllib.utils.deprecation import deprecation_warning
- from ray.rllib.utils.framework import try_import_tf
- from ray.rllib.utils.metrics.learner_info import LearnerInfoBuilder, \
- LEARNER_STATS_KEY
- from ray.rllib.utils.timer import TimerStat
- from ray.rllib.evaluation.rollout_worker import RolloutWorker
- tf1, tf, tfv = try_import_tf()
- logger = logging.getLogger(__name__)
- class MultiGPULearnerThread(LearnerThread):
- """Learner that can use multiple GPUs and parallel loading.
- This class is used for async sampling algorithms.
- Example workflow: 2 GPUs and 3 multi-GPU tower stacks.
- -> On each GPU, there are 3 slots for batches, indexed 0, 1, and 2.
- Workers collect data from env and push it into inqueue:
- Workers -> (data) -> self.inqueue
- We also have two queues, indicating, which stacks are loaded and which
- are not.
- - idle_tower_stacks = [0, 1, 2] <- all 3 stacks are free at first.
- - ready_tower_stacks = [] <- None of the 3 stacks is loaded with data.
- `ready_tower_stacks` is managed by `ready_tower_stacks_buffer` for
- possible minibatch-SGD iterations per loaded batch (this avoids a reload
- from CPU to GPU for each SGD iter).
- n _MultiGPULoaderThreads: self.inqueue -get()->
- policy.load_batch_into_buffer() -> ready_stacks = [0 ...]
- This thread: self.ready_tower_stacks_buffer -get()->
- policy.learn_on_loaded_batch() -> if SGD-iters done,
- put stack index back in idle_tower_stacks queue.
- """
- def __init__(
- self,
- local_worker: RolloutWorker,
- num_gpus: int = 1,
- lr=None, # deprecated.
- train_batch_size: int = 500,
- num_multi_gpu_tower_stacks: int = 1,
- num_sgd_iter: int = 1,
- learner_queue_size: int = 16,
- learner_queue_timeout: int = 300,
- num_data_load_threads: int = 16,
- _fake_gpus: bool = False,
- # Deprecated arg, use
- minibatch_buffer_size=None,
- ):
- """Initializes a MultiGPULearnerThread instance.
- Args:
- local_worker (RolloutWorker): Local RolloutWorker holding
- policies this thread will call `load_batch_into_buffer` and
- `learn_on_loaded_batch` on.
- num_gpus (int): Number of GPUs to use for data-parallel SGD.
- train_batch_size (int): Size of batches (minibatches if
- `num_sgd_iter` > 1) to learn on.
- num_multi_gpu_tower_stacks (int): Number of buffers to parallelly
- load data into on one device. Each buffer is of size of
- `train_batch_size` and hence increases GPU memory usage
- accordingly.
- num_sgd_iter (int): Number of passes to learn on per train batch
- (minibatch if `num_sgd_iter` > 1).
- learner_queue_size (int): Max size of queue of inbound
- train batches to this thread.
- num_data_load_threads (int): Number of threads to use to load
- data into GPU memory in parallel.
- """
- # Deprecated: No need to specify as we don't need the actual
- # minibatch-buffer anyways.
- if minibatch_buffer_size:
- deprecation_warning(
- old="MultiGPULearnerThread.minibatch_buffer_size",
- error=False,
- )
- super().__init__(
- local_worker=local_worker,
- minibatch_buffer_size=0,
- num_sgd_iter=num_sgd_iter,
- learner_queue_size=learner_queue_size,
- learner_queue_timeout=learner_queue_timeout,
- )
- # Delete reference to parent's minibatch_buffer, which is not needed.
- # Instead, in multi-GPU mode, we pull tower stack indices from the
- # `self.ready_tower_stacks_buffer` buffer, whose size is exactly
- # `num_multi_gpu_tower_stacks`.
- self.minibatch_buffer = None
- self.train_batch_size = train_batch_size
- self.policy_map = self.local_worker.policy_map
- self.devices = next(iter(self.policy_map.values())).devices
- logger.info("MultiGPULearnerThread devices {}".format(self.devices))
- assert self.train_batch_size % len(self.devices) == 0
- assert self.train_batch_size >= len(self.devices),\
- "batch too small"
- self.tower_stack_indices = list(range(num_multi_gpu_tower_stacks))
- # Two queues for tower stacks:
- # a) Those that are loaded with data ("ready")
- # b) Those that are ready to be loaded with new data ("idle").
- self.idle_tower_stacks = queue.Queue()
- self.ready_tower_stacks = queue.Queue()
- # In the beginning, all stacks are idle (no loading has taken place
- # yet).
- for idx in self.tower_stack_indices:
- self.idle_tower_stacks.put(idx)
- # Start n threads that are responsible for loading data into the
- # different (idle) stacks.
- for i in range(num_data_load_threads):
- self.loader_thread = _MultiGPULoaderThread(
- self, share_stats=(i == 0))
- self.loader_thread.start()
- # Create a buffer that holds stack indices that are "ready"
- # (loaded with data). Those are stacks that we can call
- # "learn_on_loaded_batch" on.
- self.ready_tower_stacks_buffer = MinibatchBuffer(
- self.ready_tower_stacks, num_multi_gpu_tower_stacks,
- learner_queue_timeout, num_sgd_iter)
- @override(LearnerThread)
- def step(self) -> None:
- assert self.loader_thread.is_alive()
- with self.load_wait_timer:
- buffer_idx, released = self.ready_tower_stacks_buffer.get()
- get_num_samples_loaded_into_buffer = 0
- with self.grad_timer:
- # Use LearnerInfoBuilder as a unified way to build the final
- # results dict from `learn_on_loaded_batch` call(s).
- # This makes sure results dicts always have the same structure
- # no matter the setup (multi-GPU, multi-agent, minibatch SGD,
- # tf vs torch).
- learner_info_builder = LearnerInfoBuilder(
- num_devices=len(self.devices))
- for pid in self.policy_map.keys():
- # Not a policy-to-train.
- if pid not in self.local_worker.policies_to_train:
- continue
- policy = self.policy_map[pid]
- default_policy_results = policy.learn_on_loaded_batch(
- offset=0, buffer_index=buffer_idx)
- learner_info_builder.add_learn_on_batch_results(
- default_policy_results)
- self.weights_updated = True
- get_num_samples_loaded_into_buffer += \
- policy.get_num_samples_loaded_into_buffer(buffer_idx)
- self.learner_info = learner_info_builder.finalize()
- learner_stats = {
- pid: self.learner_info[pid][LEARNER_STATS_KEY]
- for pid in self.learner_info.keys()
- }
- if released:
- self.idle_tower_stacks.put(buffer_idx)
- self.outqueue.put((get_num_samples_loaded_into_buffer, learner_stats))
- self.learner_queue_size.push(self.inqueue.qsize())
- class _MultiGPULoaderThread(threading.Thread):
- def __init__(self, multi_gpu_learner_thread: MultiGPULearnerThread,
- share_stats: bool):
- threading.Thread.__init__(self)
- self.multi_gpu_learner_thread = multi_gpu_learner_thread
- self.daemon = True
- if share_stats:
- self.queue_timer = multi_gpu_learner_thread.queue_timer
- self.load_timer = multi_gpu_learner_thread.load_timer
- else:
- self.queue_timer = TimerStat()
- self.load_timer = TimerStat()
- def run(self) -> None:
- while True:
- self._step()
- def _step(self) -> None:
- s = self.multi_gpu_learner_thread
- policy_map = s.policy_map
- # Get a new batch from the data (inqueue).
- with self.queue_timer:
- batch = s.inqueue.get()
- # Get next idle stack for loading.
- buffer_idx = s.idle_tower_stacks.get()
- # Load the batch into the idle stack.
- with self.load_timer:
- for pid in policy_map.keys():
- if pid not in s.local_worker.policies_to_train:
- continue
- policy = policy_map[pid]
- policy.load_batch_into_buffer(
- batch=batch if isinstance(batch, SampleBatch) else
- batch.policy_batches[pid],
- buffer_index=buffer_idx)
- # Tag just-loaded stack as "ready".
- s.ready_tower_stacks.put(buffer_idx)
|