123456789101112131415161718192021222324252627282930313233343536373839404142434445464748495051525354555657585960616263646566676869707172737475767778798081828384858687888990919293949596979899100101102103104105106107108109110111112113114115116117118119120121122123124125126127128129130131132133134135136137138139140141142143144145146147148149150151152153154155156157158159160161162163164165166167168169170171172173174175176177178179180181182183184185186187188189190191192193194195196197198199200201202203204205206207208209210211212213214215216217218219220221222223224225226227228229230231232233234235236237238239240241242243244245246247248249250251252253254255256257258259260261262263264265266267268269270 |
- from collections import defaultdict, deque
- import logging
- import platform
- from typing import Any, Dict, List, Optional, Sequence, Tuple, Type
- import ray
- from ray.actor import ActorClass, ActorHandle
- from ray.rllib.utils.deprecation import Deprecated
- logger = logging.getLogger(__name__)
- class TaskPool:
- """Helper class for tracking the status of many in-flight actor tasks."""
- def __init__(self):
- self._tasks = {}
- self._objects = {}
- self._fetching = deque()
- def add(self, worker, all_obj_refs):
- if isinstance(all_obj_refs, list):
- obj_ref = all_obj_refs[0]
- else:
- obj_ref = all_obj_refs
- self._tasks[obj_ref] = worker
- self._objects[obj_ref] = all_obj_refs
- def completed(self, blocking_wait=False):
- pending = list(self._tasks)
- if pending:
- ready, _ = ray.wait(pending, num_returns=len(pending), timeout=0)
- if not ready and blocking_wait:
- ready, _ = ray.wait(pending, num_returns=1, timeout=10.0)
- for obj_ref in ready:
- yield (self._tasks.pop(obj_ref), self._objects.pop(obj_ref))
- def completed_prefetch(self, blocking_wait=False, max_yield=999):
- """Similar to completed but only returns once the object is local.
- Assumes obj_ref only is one id."""
- for worker, obj_ref in self.completed(blocking_wait=blocking_wait):
- self._fetching.append((worker, obj_ref))
- for _ in range(max_yield):
- if not self._fetching:
- break
- yield self._fetching.popleft()
- def reset_workers(self, workers):
- """Notify that some workers may be removed."""
- for obj_ref, ev in self._tasks.copy().items():
- if ev not in workers:
- del self._tasks[obj_ref]
- del self._objects[obj_ref]
- # We want to keep the same deque reference so that we don't suffer from
- # stale references in generators that are still in flight
- for _ in range(len(self._fetching)):
- ev, obj_ref = self._fetching.popleft()
- if ev in workers:
- # Re-queue items that are still valid
- self._fetching.append((ev, obj_ref))
- @property
- def count(self):
- return len(self._tasks)
- def create_colocated_actors(
- actor_specs: Sequence[Tuple[Type, Any, Any, int]],
- node: Optional[str] = "localhost",
- max_attempts: int = 10,
- ) -> Dict[Type, List[ActorHandle]]:
- """Create co-located actors of any type(s) on any node.
- Args:
- actor_specs: Tuple/list with tuples consisting of: 1) The
- (already @ray.remote) class(es) to construct, 2) c'tor args,
- 3) c'tor kwargs, and 4) the number of actors of that class with
- given args/kwargs to construct.
- node: The node to co-locate the actors on. By default ("localhost"),
- place the actors on the node the caller of this function is
- located on. Use None for indicating that any (resource fulfilling)
- node in the cluster may be used.
- max_attempts: The maximum number of co-location attempts to
- perform before throwing an error.
- Returns:
- A dict mapping the created types to the list of n ActorHandles
- created (and co-located) for that type.
- """
- if node == "localhost":
- node = platform.node()
- # Maps each entry in `actor_specs` to lists of already co-located actors.
- ok = [[] for _ in range(len(actor_specs))]
- # Try n times to co-locate all given actor types (`actor_specs`).
- # With each (failed) attempt, increase the number of actors we try to
- # create (on the same node), then kill the ones that have been created in
- # excess.
- for attempt in range(max_attempts):
- # If any attempt to co-locate fails, set this to False and we'll do
- # another attempt.
- all_good = True
- # Process all `actor_specs` in sequence.
- for i, (typ, args, kwargs, count) in enumerate(actor_specs):
- args = args or [] # Allow None.
- kwargs = kwargs or {} # Allow None.
- # We don't have enough actors yet of this spec co-located on
- # the desired node.
- if len(ok[i]) < count:
- co_located = try_create_colocated(
- cls=typ,
- args=args,
- kwargs=kwargs,
- count=count * (attempt + 1),
- node=node)
- # If node did not matter (None), from here on, use the host
- # that the first actor(s) are already co-located on.
- if node is None:
- node = ray.get(co_located[0].get_host.remote())
- # Add the newly co-located actors to the `ok` list.
- ok[i].extend(co_located)
- # If we still don't have enough -> We'll have to do another
- # attempt.
- if len(ok[i]) < count:
- all_good = False
- # We created too many actors for this spec -> Kill/truncate
- # the excess ones.
- if len(ok[i]) > count:
- for a in ok[i][count:]:
- a.__ray_terminate__.remote()
- ok[i] = ok[i][:count]
- # All `actor_specs` have been fulfilled, return lists of
- # co-located actors.
- if all_good:
- return ok
- raise Exception("Unable to create enough colocated actors -> aborting.")
- def try_create_colocated(
- cls: Type[ActorClass],
- args: List[Any],
- count: int,
- kwargs: Optional[List[Any]] = None,
- node: Optional[str] = "localhost",
- ) -> List[ActorHandle]:
- """Tries to co-locate (same node) a set of Actors of the same type.
- Returns a list of successfully co-located actors. All actors that could
- not be co-located (with the others on the given node) will not be in this
- list.
- Creates each actor via it's remote() constructor and then checks, whether
- it has been co-located (on the same node) with the other (already created)
- ones. If not, terminates the just created actor.
- Args:
- cls: The Actor class to use (already @ray.remote "converted").
- args: List of args to pass to the Actor's constructor. One item
- per to-be-created actor (`count`).
- count: Number of actors of the given `cls` to construct.
- kwargs: Optional list of kwargs to pass to the Actor's constructor.
- One item per to-be-created actor (`count`).
- node: The node to co-locate the actors on. By default ("localhost"),
- place the actors on the node the caller of this function is
- located on. If None, will try to co-locate all actors on
- any available node.
- Returns:
- List containing all successfully co-located actor handles.
- """
- if node == "localhost":
- node = platform.node()
- kwargs = kwargs or {}
- actors = [cls.remote(*args, **kwargs) for _ in range(count)]
- co_located, non_co_located = split_colocated(actors, node=node)
- logger.info("Got {} colocated actors of {}".format(len(co_located), count))
- for a in non_co_located:
- a.__ray_terminate__.remote()
- return co_located
- def split_colocated(
- actors: List[ActorHandle],
- node: Optional[str] = "localhost",
- ) -> Tuple[List[ActorHandle], List[ActorHandle]]:
- """Splits up given actors into colocated (on same node) and non colocated.
- The co-location criterion depends on the `node` given:
- If given (or default: platform.node()): Consider all actors that are on
- that node "colocated".
- If None: Consider the largest sub-set of actors that are all located on
- the same node (whatever that node is) as "colocated".
- Args:
- actors: The list of actor handles to split into "colocated" and
- "non colocated".
- node: The node defining "colocation" criterion. If provided, consider
- thos actors "colocated" that sit on this node. If None, use the
- largest subset within `actors` that are sitting on the same
- (any) node.
- Returns:
- Tuple of two lists: 1) Co-located ActorHandles, 2) non co-located
- ActorHandles.
- """
- if node == "localhost":
- node = platform.node()
- # Get nodes of all created actors.
- hosts = ray.get([a.get_host.remote() for a in actors])
- # If `node` not provided, use the largest group of actors that sit on the
- # same node, regardless of what that node is.
- if node is None:
- node_groups = defaultdict(set)
- for host, actor in zip(hosts, actors):
- node_groups[host].add(actor)
- max_ = -1
- largest_group = None
- for host in node_groups:
- if max_ < len(node_groups[host]):
- max_ = len(node_groups[host])
- largest_group = host
- non_co_located = []
- for host in node_groups:
- if host != largest_group:
- non_co_located.extend(list(node_groups[host]))
- return list(node_groups[largest_group]), non_co_located
- # Node provided (or default: localhost): Consider those actors "colocated"
- # that were placed on `node`.
- else:
- # Split into co-located (on `node) and non-co-located (not on `node`).
- co_located = []
- non_co_located = []
- for host, a in zip(hosts, actors):
- # This actor has been placed on the correct node.
- if host == node:
- co_located.append(a)
- # This actor has been placed on a different node.
- else:
- non_co_located.append(a)
- return co_located, non_co_located
- @Deprecated(new="create_colocated_actors", error=False)
- def create_colocated(cls, arg, count):
- kwargs = {}
- args = arg
- return create_colocated_actors(
- actor_specs=[(cls, args, kwargs, count)],
- node=platform.node(), # force on localhost
- )[cls]
- @Deprecated(error=False)
- def drop_colocated(actors: List[ActorHandle]) -> List[ActorHandle]:
- colocated, non_colocated = split_colocated(actors)
- for a in colocated:
- a.__ray_terminate__.remote()
- return non_colocated
|