123456789101112131415161718192021222324252627282930313233343536373839404142434445464748495051525354555657585960616263646566676869707172737475767778798081828384858687888990919293949596979899100101102103104105106107108109110111112113114115116117118119120121122123124125126127128129130131132133134135136137138139140141142143144145146147148149150151152153154155156157158159160161162163164165166167168169170171172173174175176177178179180181182183184185186187188189190191192193194195 |
- from typing import Any, List, Dict
- import time
- from ray.actor import ActorHandle
- from ray.util.iter import LocalIterator
- from ray.rllib.evaluation.metrics import collect_episodes, summarize_episodes
- from ray.rllib.execution.common import AGENT_STEPS_SAMPLED_COUNTER, \
- STEPS_SAMPLED_COUNTER, STEPS_TRAINED_COUNTER, \
- STEPS_TRAINED_THIS_ITER_COUNTER, _get_shared_metrics
- from ray.rllib.evaluation.worker_set import WorkerSet
- def StandardMetricsReporting(
- train_op: LocalIterator[Any],
- workers: WorkerSet,
- config: dict,
- selected_workers: List[ActorHandle] = None,
- by_steps_trained: bool = False,
- ) -> LocalIterator[dict]:
- """Operator to periodically collect and report metrics.
- Args:
- train_op (LocalIterator): Operator for executing training steps.
- We ignore the output values.
- workers (WorkerSet): Rollout workers to collect metrics from.
- config (dict): Trainer configuration, used to determine the frequency
- of stats reporting.
- selected_workers (list): Override the list of remote workers
- to collect metrics from.
- by_steps_trained (bool): If True, uses the `STEPS_TRAINED_COUNTER`
- instead of the `STEPS_SAMPLED_COUNTER` in metrics.
- Returns:
- LocalIterator[dict]: A local iterator over training results.
- Examples:
- >>> train_op = ParallelRollouts(...).for_each(TrainOneStep(...))
- >>> metrics_op = StandardMetricsReporting(train_op, workers, config)
- >>> next(metrics_op)
- {"episode_reward_max": ..., "episode_reward_mean": ..., ...}
- """
- output_op = train_op \
- .filter(OncePerTimestepsElapsed(config["timesteps_per_iteration"],
- by_steps_trained=by_steps_trained)) \
- .filter(OncePerTimeInterval(config["min_iter_time_s"])) \
- .for_each(CollectMetrics(
- workers,
- min_history=config["metrics_smoothing_episodes"],
- timeout_seconds=config["collect_metrics_timeout"],
- selected_workers=selected_workers))
- return output_op
- class CollectMetrics:
- """Callable that collects metrics from workers.
- The metrics are smoothed over a given history window.
- This should be used with the .for_each() operator. For a higher level
- API, consider using StandardMetricsReporting instead.
- Examples:
- >>> output_op = train_op.for_each(CollectMetrics(workers))
- >>> print(next(output_op))
- {"episode_reward_max": ..., "episode_reward_mean": ..., ...}
- """
- def __init__(self,
- workers: WorkerSet,
- min_history: int = 100,
- timeout_seconds: int = 180,
- selected_workers: List[ActorHandle] = None):
- self.workers = workers
- self.episode_history = []
- self.to_be_collected = []
- self.min_history = min_history
- self.timeout_seconds = timeout_seconds
- self.selected_workers = selected_workers
- def __call__(self, _: Any) -> Dict:
- # Collect worker metrics.
- episodes, self.to_be_collected = collect_episodes(
- self.workers.local_worker(),
- self.selected_workers or self.workers.remote_workers(),
- self.to_be_collected,
- timeout_seconds=self.timeout_seconds)
- orig_episodes = list(episodes)
- missing = self.min_history - len(episodes)
- if missing > 0:
- episodes = self.episode_history[-missing:] + episodes
- assert len(episodes) <= self.min_history
- self.episode_history.extend(orig_episodes)
- self.episode_history = self.episode_history[-self.min_history:]
- res = summarize_episodes(episodes, orig_episodes)
- # Add in iterator metrics.
- metrics = _get_shared_metrics()
- custom_metrics_from_info = metrics.info.pop("custom_metrics", {})
- timers = {}
- counters = {}
- info = {}
- info.update(metrics.info)
- for k, counter in metrics.counters.items():
- counters[k] = counter
- for k, timer in metrics.timers.items():
- timers["{}_time_ms".format(k)] = round(timer.mean * 1000, 3)
- if timer.has_units_processed():
- timers["{}_throughput".format(k)] = round(
- timer.mean_throughput, 3)
- res.update({
- "num_healthy_workers": len(self.workers.remote_workers()),
- "timesteps_total": metrics.counters[STEPS_SAMPLED_COUNTER],
- # tune.Trainable uses timesteps_this_iter for tracking
- # total timesteps.
- "timesteps_this_iter": metrics.counters[
- STEPS_TRAINED_THIS_ITER_COUNTER],
- "agent_timesteps_total": metrics.counters.get(
- AGENT_STEPS_SAMPLED_COUNTER, 0),
- })
- res["timers"] = timers
- res["info"] = info
- res["info"].update(counters)
- res["custom_metrics"] = res.get("custom_metrics", {})
- res["episode_media"] = res.get("episode_media", {})
- res["custom_metrics"].update(custom_metrics_from_info)
- return res
- class OncePerTimeInterval:
- """Callable that returns True once per given interval.
- This should be used with the .filter() operator to throttle / rate-limit
- metrics reporting. For a higher-level API, consider using
- StandardMetricsReporting instead.
- Examples:
- >>> throttled_op = train_op.filter(OncePerTimeInterval(5))
- >>> start = time.time()
- >>> next(throttled_op)
- >>> print(time.time() - start)
- 5.00001 # will be greater than 5 seconds
- """
- def __init__(self, delay: int):
- self.delay = delay
- self.last_called = 0
- def __call__(self, item: Any) -> bool:
- if self.delay <= 0.0:
- return True
- now = time.time()
- if now - self.last_called > self.delay:
- self.last_called = now
- return True
- return False
- class OncePerTimestepsElapsed:
- """Callable that returns True once per given number of timesteps.
- This should be used with the .filter() operator to throttle / rate-limit
- metrics reporting. For a higher-level API, consider using
- StandardMetricsReporting instead.
- Examples:
- >>> throttled_op = train_op.filter(OncePerTimestepsElapsed(1000))
- >>> next(throttled_op)
- # will only return after 1000 steps have elapsed
- """
- def __init__(self, delay_steps: int, by_steps_trained: bool = False):
- """
- Args:
- delay_steps (int): The number of steps (sampled or trained) every
- which this op returns True.
- by_steps_trained (bool): If True, uses the `STEPS_TRAINED_COUNTER`
- instead of the `STEPS_SAMPLED_COUNTER` in metrics.
- """
- self.delay_steps = delay_steps
- self.by_steps_trained = by_steps_trained
- self.last_called = 0
- def __call__(self, item: Any) -> bool:
- if self.delay_steps <= 0:
- return True
- metrics = _get_shared_metrics()
- if self.by_steps_trained:
- now = metrics.counters[STEPS_TRAINED_COUNTER]
- else:
- now = metrics.counters[STEPS_SAMPLED_COUNTER]
- if now - self.last_called >= self.delay_steps:
- self.last_called = now
- return True
- return False
|