actors.py 10 KB

123456789101112131415161718192021222324252627282930313233343536373839404142434445464748495051525354555657585960616263646566676869707172737475767778798081828384858687888990919293949596979899100101102103104105106107108109110111112113114115116117118119120121122123124125126127128129130131132133134135136137138139140141142143144145146147148149150151152153154155156157158159160161162163164165166167168169170171172173174175176177178179180181182183184185186187188189190191192193194195196197198199200201202203204205206207208209210211212213214215216217218219220221222223224225226227228229230231232233234235236237238239240241242243244245246247248249250251252253254255256257258259260261262263264265266267268269270
  1. from collections import defaultdict, deque
  2. import logging
  3. import platform
  4. from typing import Any, Dict, List, Optional, Sequence, Tuple, Type
  5. import ray
  6. from ray.actor import ActorClass, ActorHandle
  7. from ray.rllib.utils.deprecation import Deprecated
  8. logger = logging.getLogger(__name__)
  9. class TaskPool:
  10. """Helper class for tracking the status of many in-flight actor tasks."""
  11. def __init__(self):
  12. self._tasks = {}
  13. self._objects = {}
  14. self._fetching = deque()
  15. def add(self, worker, all_obj_refs):
  16. if isinstance(all_obj_refs, list):
  17. obj_ref = all_obj_refs[0]
  18. else:
  19. obj_ref = all_obj_refs
  20. self._tasks[obj_ref] = worker
  21. self._objects[obj_ref] = all_obj_refs
  22. def completed(self, blocking_wait=False):
  23. pending = list(self._tasks)
  24. if pending:
  25. ready, _ = ray.wait(pending, num_returns=len(pending), timeout=0)
  26. if not ready and blocking_wait:
  27. ready, _ = ray.wait(pending, num_returns=1, timeout=10.0)
  28. for obj_ref in ready:
  29. yield (self._tasks.pop(obj_ref), self._objects.pop(obj_ref))
  30. def completed_prefetch(self, blocking_wait=False, max_yield=999):
  31. """Similar to completed but only returns once the object is local.
  32. Assumes obj_ref only is one id."""
  33. for worker, obj_ref in self.completed(blocking_wait=blocking_wait):
  34. self._fetching.append((worker, obj_ref))
  35. for _ in range(max_yield):
  36. if not self._fetching:
  37. break
  38. yield self._fetching.popleft()
  39. def reset_workers(self, workers):
  40. """Notify that some workers may be removed."""
  41. for obj_ref, ev in self._tasks.copy().items():
  42. if ev not in workers:
  43. del self._tasks[obj_ref]
  44. del self._objects[obj_ref]
  45. # We want to keep the same deque reference so that we don't suffer from
  46. # stale references in generators that are still in flight
  47. for _ in range(len(self._fetching)):
  48. ev, obj_ref = self._fetching.popleft()
  49. if ev in workers:
  50. # Re-queue items that are still valid
  51. self._fetching.append((ev, obj_ref))
  52. @property
  53. def count(self):
  54. return len(self._tasks)
  55. def create_colocated_actors(
  56. actor_specs: Sequence[Tuple[Type, Any, Any, int]],
  57. node: Optional[str] = "localhost",
  58. max_attempts: int = 10,
  59. ) -> Dict[Type, List[ActorHandle]]:
  60. """Create co-located actors of any type(s) on any node.
  61. Args:
  62. actor_specs: Tuple/list with tuples consisting of: 1) The
  63. (already @ray.remote) class(es) to construct, 2) c'tor args,
  64. 3) c'tor kwargs, and 4) the number of actors of that class with
  65. given args/kwargs to construct.
  66. node: The node to co-locate the actors on. By default ("localhost"),
  67. place the actors on the node the caller of this function is
  68. located on. Use None for indicating that any (resource fulfilling)
  69. node in the cluster may be used.
  70. max_attempts: The maximum number of co-location attempts to
  71. perform before throwing an error.
  72. Returns:
  73. A dict mapping the created types to the list of n ActorHandles
  74. created (and co-located) for that type.
  75. """
  76. if node == "localhost":
  77. node = platform.node()
  78. # Maps each entry in `actor_specs` to lists of already co-located actors.
  79. ok = [[] for _ in range(len(actor_specs))]
  80. # Try n times to co-locate all given actor types (`actor_specs`).
  81. # With each (failed) attempt, increase the number of actors we try to
  82. # create (on the same node), then kill the ones that have been created in
  83. # excess.
  84. for attempt in range(max_attempts):
  85. # If any attempt to co-locate fails, set this to False and we'll do
  86. # another attempt.
  87. all_good = True
  88. # Process all `actor_specs` in sequence.
  89. for i, (typ, args, kwargs, count) in enumerate(actor_specs):
  90. args = args or [] # Allow None.
  91. kwargs = kwargs or {} # Allow None.
  92. # We don't have enough actors yet of this spec co-located on
  93. # the desired node.
  94. if len(ok[i]) < count:
  95. co_located = try_create_colocated(
  96. cls=typ,
  97. args=args,
  98. kwargs=kwargs,
  99. count=count * (attempt + 1),
  100. node=node)
  101. # If node did not matter (None), from here on, use the host
  102. # that the first actor(s) are already co-located on.
  103. if node is None:
  104. node = ray.get(co_located[0].get_host.remote())
  105. # Add the newly co-located actors to the `ok` list.
  106. ok[i].extend(co_located)
  107. # If we still don't have enough -> We'll have to do another
  108. # attempt.
  109. if len(ok[i]) < count:
  110. all_good = False
  111. # We created too many actors for this spec -> Kill/truncate
  112. # the excess ones.
  113. if len(ok[i]) > count:
  114. for a in ok[i][count:]:
  115. a.__ray_terminate__.remote()
  116. ok[i] = ok[i][:count]
  117. # All `actor_specs` have been fulfilled, return lists of
  118. # co-located actors.
  119. if all_good:
  120. return ok
  121. raise Exception("Unable to create enough colocated actors -> aborting.")
  122. def try_create_colocated(
  123. cls: Type[ActorClass],
  124. args: List[Any],
  125. count: int,
  126. kwargs: Optional[List[Any]] = None,
  127. node: Optional[str] = "localhost",
  128. ) -> List[ActorHandle]:
  129. """Tries to co-locate (same node) a set of Actors of the same type.
  130. Returns a list of successfully co-located actors. All actors that could
  131. not be co-located (with the others on the given node) will not be in this
  132. list.
  133. Creates each actor via it's remote() constructor and then checks, whether
  134. it has been co-located (on the same node) with the other (already created)
  135. ones. If not, terminates the just created actor.
  136. Args:
  137. cls: The Actor class to use (already @ray.remote "converted").
  138. args: List of args to pass to the Actor's constructor. One item
  139. per to-be-created actor (`count`).
  140. count: Number of actors of the given `cls` to construct.
  141. kwargs: Optional list of kwargs to pass to the Actor's constructor.
  142. One item per to-be-created actor (`count`).
  143. node: The node to co-locate the actors on. By default ("localhost"),
  144. place the actors on the node the caller of this function is
  145. located on. If None, will try to co-locate all actors on
  146. any available node.
  147. Returns:
  148. List containing all successfully co-located actor handles.
  149. """
  150. if node == "localhost":
  151. node = platform.node()
  152. kwargs = kwargs or {}
  153. actors = [cls.remote(*args, **kwargs) for _ in range(count)]
  154. co_located, non_co_located = split_colocated(actors, node=node)
  155. logger.info("Got {} colocated actors of {}".format(len(co_located), count))
  156. for a in non_co_located:
  157. a.__ray_terminate__.remote()
  158. return co_located
  159. def split_colocated(
  160. actors: List[ActorHandle],
  161. node: Optional[str] = "localhost",
  162. ) -> Tuple[List[ActorHandle], List[ActorHandle]]:
  163. """Splits up given actors into colocated (on same node) and non colocated.
  164. The co-location criterion depends on the `node` given:
  165. If given (or default: platform.node()): Consider all actors that are on
  166. that node "colocated".
  167. If None: Consider the largest sub-set of actors that are all located on
  168. the same node (whatever that node is) as "colocated".
  169. Args:
  170. actors: The list of actor handles to split into "colocated" and
  171. "non colocated".
  172. node: The node defining "colocation" criterion. If provided, consider
  173. thos actors "colocated" that sit on this node. If None, use the
  174. largest subset within `actors` that are sitting on the same
  175. (any) node.
  176. Returns:
  177. Tuple of two lists: 1) Co-located ActorHandles, 2) non co-located
  178. ActorHandles.
  179. """
  180. if node == "localhost":
  181. node = platform.node()
  182. # Get nodes of all created actors.
  183. hosts = ray.get([a.get_host.remote() for a in actors])
  184. # If `node` not provided, use the largest group of actors that sit on the
  185. # same node, regardless of what that node is.
  186. if node is None:
  187. node_groups = defaultdict(set)
  188. for host, actor in zip(hosts, actors):
  189. node_groups[host].add(actor)
  190. max_ = -1
  191. largest_group = None
  192. for host in node_groups:
  193. if max_ < len(node_groups[host]):
  194. max_ = len(node_groups[host])
  195. largest_group = host
  196. non_co_located = []
  197. for host in node_groups:
  198. if host != largest_group:
  199. non_co_located.extend(list(node_groups[host]))
  200. return list(node_groups[largest_group]), non_co_located
  201. # Node provided (or default: localhost): Consider those actors "colocated"
  202. # that were placed on `node`.
  203. else:
  204. # Split into co-located (on `node) and non-co-located (not on `node`).
  205. co_located = []
  206. non_co_located = []
  207. for host, a in zip(hosts, actors):
  208. # This actor has been placed on the correct node.
  209. if host == node:
  210. co_located.append(a)
  211. # This actor has been placed on a different node.
  212. else:
  213. non_co_located.append(a)
  214. return co_located, non_co_located
  215. @Deprecated(new="create_colocated_actors", error=False)
  216. def create_colocated(cls, arg, count):
  217. kwargs = {}
  218. args = arg
  219. return create_colocated_actors(
  220. actor_specs=[(cls, args, kwargs, count)],
  221. node=platform.node(), # force on localhost
  222. )[cls]
  223. @Deprecated(error=False)
  224. def drop_colocated(actors: List[ActorHandle]) -> List[ActorHandle]:
  225. colocated, non_colocated = split_colocated(actors)
  226. for a in colocated:
  227. a.__ray_terminate__.remote()
  228. return non_colocated