advanced.rst 15 KB

123456789101112131415161718192021222324252627282930313233343536373839404142434445464748495051525354555657585960616263646566676869707172737475767778798081828384858687888990919293949596979899100101102103104105106107108109110111112113114115116117118119120121122123124125126127128129130131132133134135136137138139140141142143144145146147148149150151152153154155156157158159160161162163164165166167168169170171172173174175176177178179180181182183184185186187188189190191192193194195196197198199200201202203204205206207208209210211212213214215216217218219220221222223224225226227228229230231232233234235236237238239240241242243244245246247248249250251252253254255256257258259260261262263264265266267268269270271272273274275276277278279280281282283284285286287288289290291292293294295296297298299300301302303304305306307308309310311312313314315316317318319320321322323324325326327328329330331332333334335336337338339340341342343344345346347348349350351352353354355356357358359360361362363364365366367368369370371372373374375376377378379380381382383384385386387388389390391392393394395396397398399400401402403404405406407408409410411412413414415416417418419420421422423424425426427428429430431432433434435436437438439440441442443444445446447448449450451452453454455
  1. Advanced Usage
  2. ==============
  3. This page will cover some more advanced examples of using Ray's flexible programming model.
  4. .. contents::
  5. :local:
  6. Synchronization
  7. ---------------
  8. Tasks or actors can often contend over the same resource or need to communicate with each other. Here are some standard ways to perform synchronization across Ray processes.
  9. Inter-process synchronization using FileLock
  10. ~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~
  11. If you have several tasks or actors writing to the same file or downloading a file on a single node, you can use `FileLock <https://pypi.org/project/filelock/>`_ to synchronize.
  12. This often occurs for data loading and preprocessing.
  13. .. code-block:: python
  14. import ray
  15. from filelock import FileLock
  16. @ray.remote
  17. def write_to_file(text):
  18. # Create a filelock object. Consider using an absolute path for the lock.
  19. with FileLock("my_data.txt.lock"):
  20. with open("my_data.txt","a") as f:
  21. f.write(text)
  22. ray.init()
  23. ray.get([write_to_file.remote("hi there!\n") for i in range(3)])
  24. with open("my_data.txt") as f:
  25. print(f.read())
  26. ## Output is:
  27. # hi there!
  28. # hi there!
  29. # hi there!
  30. Multi-node synchronization using ``SignalActor``
  31. ~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~
  32. When you have multiple tasks that need to wait on some condition, you can use a ``SignalActor`` to coordinate.
  33. .. code-block:: python
  34. # Also available via `from ray.test_utils import SignalActor`
  35. import ray
  36. import asyncio
  37. @ray.remote(num_cpus=0)
  38. class SignalActor:
  39. def __init__(self):
  40. self.ready_event = asyncio.Event()
  41. def send(self, clear=False):
  42. self.ready_event.set()
  43. if clear:
  44. self.ready_event.clear()
  45. async def wait(self, should_wait=True):
  46. if should_wait:
  47. await self.ready_event.wait()
  48. @ray.remote
  49. def wait_and_go(signal):
  50. ray.get(signal.wait.remote())
  51. print("go!")
  52. ray.init()
  53. signal = SignalActor.remote()
  54. tasks = [wait_and_go.remote(signal) for _ in range(4)]
  55. print("ready...")
  56. # Tasks will all be waiting for the signals.
  57. print("set..")
  58. ray.get(signal.send.remote())
  59. # Tasks are unblocked.
  60. ray.get(tasks)
  61. ## Output is:
  62. # ready...
  63. # get set..
  64. # (pid=77366) go!
  65. # (pid=77372) go!
  66. # (pid=77367) go!
  67. # (pid=77358) go!
  68. Message passing using Ray Queue
  69. ~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~
  70. Sometimes just using one signal to synchronize is not enough. If you need to send data among many tasks or
  71. actors, you can use :ref:`ray.util.queue.Queue <ray-queue-ref>`.
  72. .. code-block:: python
  73. import ray
  74. from ray.util.queue import Queue
  75. ray.init()
  76. # You can pass this object around to different tasks/actors
  77. queue = Queue(maxsize=100)
  78. @ray.remote
  79. def consumer(queue):
  80. next_item = queue.get(block=True)
  81. print(f"got work {next_item}")
  82. consumers = [consumer.remote(queue) for _ in range(2)]
  83. [queue.put(i) for i in range(10)]
  84. Ray's Queue API has similar API as Python's ``asyncio.Queue`` and ``queue.Queue``.
  85. Dynamic Remote Parameters
  86. -------------------------
  87. You can dynamically adjust resource requirements or return values of ``ray.remote`` during execution with ``.options``.
  88. For example, here we instantiate many copies of the same actor with varying resource requirements. Note that to create these actors successfully, Ray will need to be started with sufficient CPU resources and the relevant custom resources:
  89. .. code-block:: python
  90. @ray.remote(num_cpus=4)
  91. class Counter(object):
  92. def __init__(self):
  93. self.value = 0
  94. def increment(self):
  95. self.value += 1
  96. return self.value
  97. a1 = Counter.options(num_cpus=1, resources={"Custom1": 1}).remote()
  98. a2 = Counter.options(num_cpus=2, resources={"Custom2": 1}).remote()
  99. a3 = Counter.options(num_cpus=3, resources={"Custom3": 1}).remote()
  100. You can specify different resource requirements for tasks (but not for actor methods):
  101. .. code-block:: python
  102. @ray.remote
  103. def g():
  104. return ray.get_gpu_ids()
  105. object_gpu_ids = g.remote()
  106. assert ray.get(object_gpu_ids) == [0]
  107. dynamic_object_gpu_ids = g.options(num_cpus=1, num_gpus=1).remote()
  108. assert ray.get(dynamic_object_gpu_ids) == [0]
  109. And vary the number of return values for tasks (and actor methods too):
  110. .. code-block:: python
  111. @ray.remote
  112. def f(n):
  113. return list(range(n))
  114. id1, id2 = f.options(num_returns=2).remote(2)
  115. assert ray.get(id1) == 0
  116. assert ray.get(id2) == 1
  117. And specify a name for tasks (and actor methods too) at task submission time:
  118. .. code-block:: python
  119. import setproctitle
  120. @ray.remote
  121. def f(x):
  122. assert setproctitle.getproctitle() == "ray::special_f"
  123. return x + 1
  124. obj = f.options(name="special_f").remote(3)
  125. assert ray.get(obj) == 4
  126. This name will appear as the task name in the machine view of the dashboard, will appear
  127. as the worker process name when this task is executing (if a Python task), and will
  128. appear as the task name in the logs.
  129. .. image:: images/task_name_dashboard.png
  130. Accelerator Types
  131. ------------------
  132. Ray supports resource specific accelerator types. The `accelerator_type` field can be used to force to a task to run on a node with a specific type of accelerator. Under the hood, the accelerator type option is implemented as a custom resource demand of ``"accelerator_type:<type>": 0.001``. This forces the task to be placed on a node with that particular accelerator type available. This also lets the multi-node-type autoscaler know that there is demand for that type of resource, potentially triggering the launch of new nodes providing that accelerator.
  133. .. code-block:: python
  134. from ray.accelerators import NVIDIA_TESLA_V100
  135. @ray.remote(num_gpus=1, accelerator_type=NVIDIA_TESLA_V100)
  136. def train(data):
  137. return "This function was run on a node with a Tesla V100 GPU"
  138. See `ray.util.accelerators` to see available accelerator types. Current automatically detected accelerator types include Nvidia GPUs.
  139. Overloaded Functions
  140. --------------------
  141. Ray Java API supports calling overloaded java functions remotely. However, due to the limitation of Java compiler type inference, one must explicitly cast the method reference to the correct function type. For example, consider the following.
  142. Overloaded normal task call:
  143. .. code:: java
  144. public static class MyRayApp {
  145. public static int overloadFunction() {
  146. return 1;
  147. }
  148. public static int overloadFunction(int x) {
  149. return x;
  150. }
  151. }
  152. // Invoke overloaded functions.
  153. Assert.assertEquals((int) Ray.task((RayFunc0<Integer>) MyRayApp::overloadFunction).remote().get(), 1);
  154. Assert.assertEquals((int) Ray.task((RayFunc1<Integer, Integer>) MyRayApp::overloadFunction, 2).remote().get(), 2);
  155. Overloaded actor task call:
  156. .. code:: java
  157. public static class Counter {
  158. protected int value = 0;
  159. public int increment() {
  160. this.value += 1;
  161. return this.value;
  162. }
  163. }
  164. public static class CounterOverloaded extends Counter {
  165. public int increment(int diff) {
  166. super.value += diff;
  167. return super.value;
  168. }
  169. public int increment(int diff1, int diff2) {
  170. super.value += diff1 + diff2;
  171. return super.value;
  172. }
  173. }
  174. .. code:: java
  175. ActorHandle<CounterOverloaded> a = Ray.actor(CounterOverloaded::new).remote();
  176. // Call an overloaded actor method by super class method reference.
  177. Assert.assertEquals((int) a.task(Counter::increment).remote().get(), 1);
  178. // Call an overloaded actor method, cast method reference first.
  179. a.task((RayFunc1<CounterOverloaded, Integer>) CounterOverloaded::increment).remote();
  180. a.task((RayFunc2<CounterOverloaded, Integer, Integer>) CounterOverloaded::increment, 10).remote();
  181. a.task((RayFunc3<CounterOverloaded, Integer, Integer, Integer>) CounterOverloaded::increment, 10, 10).remote();
  182. Assert.assertEquals((int) a.task(Counter::increment).remote().get(), 33);
  183. Nested Remote Functions
  184. -----------------------
  185. Remote functions can call other remote functions, resulting in nested tasks.
  186. For example, consider the following.
  187. .. code:: python
  188. @ray.remote
  189. def f():
  190. return 1
  191. @ray.remote
  192. def g():
  193. # Call f 4 times and return the resulting object refs.
  194. return [f.remote() for _ in range(4)]
  195. @ray.remote
  196. def h():
  197. # Call f 4 times, block until those 4 tasks finish,
  198. # retrieve the results, and return the values.
  199. return ray.get([f.remote() for _ in range(4)])
  200. Then calling ``g`` and ``h`` produces the following behavior.
  201. .. code:: python
  202. >>> ray.get(g.remote())
  203. [ObjectRef(b1457ba0911ae84989aae86f89409e953dd9a80e),
  204. ObjectRef(7c14a1d13a56d8dc01e800761a66f09201104275),
  205. ObjectRef(99763728ffc1a2c0766a2000ebabded52514e9a6),
  206. ObjectRef(9c2f372e1933b04b2936bb6f58161285829b9914)]
  207. >>> ray.get(h.remote())
  208. [1, 1, 1, 1]
  209. **One limitation** is that the definition of ``f`` must come before the
  210. definitions of ``g`` and ``h`` because as soon as ``g`` is defined, it
  211. will be pickled and shipped to the workers, and so if ``f`` hasn't been
  212. defined yet, the definition will be incomplete.
  213. Circular Dependencies
  214. ---------------------
  215. Consider the following remote function.
  216. .. code-block:: python
  217. @ray.remote(num_cpus=1, num_gpus=1)
  218. def g():
  219. return ray.get(f.remote())
  220. When a ``g`` task is executing, it will release its CPU resources when it gets
  221. blocked in the call to ``ray.get``. It will reacquire the CPU resources when
  222. ``ray.get`` returns. It will retain its GPU resources throughout the lifetime of
  223. the task because the task will most likely continue to use GPU memory.
  224. Cython Code in Ray
  225. ------------------
  226. To use Cython code in Ray, run the following from directory ``$RAY_HOME/examples/cython``:
  227. .. code-block:: bash
  228. pip install scipy # For BLAS example
  229. pip install -e .
  230. python cython_main.py --help
  231. You can import the ``cython_examples`` module from a Python script or interpreter.
  232. Notes
  233. ~~~~~
  234. * You **must** include the following two lines at the top of any ``*.pyx`` file:
  235. .. code-block:: python
  236. #!python
  237. # cython: embedsignature=True, binding=True
  238. * You cannot decorate Cython functions within a ``*.pyx`` file (there are ways around this, but creates a leaky abstraction between Cython and Python that would be very challenging to support generally). Instead, prefer the following in your Python code:
  239. .. code-block:: python
  240. some_cython_func = ray.remote(some_cython_module.some_cython_func)
  241. * You cannot transfer memory buffers to a remote function (see ``example8``, which currently fails); your remote function must return a value
  242. * Have a look at ``cython_main.py``, ``cython_simple.pyx``, and ``setup.py`` for examples of how to call, define, and build Cython code, respectively. The Cython `documentation <http://cython.readthedocs.io/>`_ is also very helpful.
  243. * Several limitations come from Cython's own `unsupported <https://github.com/cython/cython/wiki/Unsupported>`_ Python features.
  244. * We currently do not support compiling and distributing Cython code to ``ray`` clusters. In other words, Cython developers are responsible for compiling and distributing any Cython code to their cluster (much as would be the case for users who need Python packages like ``scipy``).
  245. * For most simple use cases, developers need not worry about Python 2 or 3, but users who do need to care can have a look at the ``language_level`` Cython compiler directive (see `here <http://cython.readthedocs.io/en/latest/src/reference/compilation.html>`_).
  246. Inspecting Cluster State
  247. ------------------------
  248. Applications written on top of Ray will often want to have some information
  249. or diagnostics about the cluster. Some common questions include:
  250. 1. How many nodes are in my autoscaling cluster?
  251. 2. What resources are currently available in my cluster, both used and total?
  252. 3. What are the objects currently in my cluster?
  253. For this, you can use the global state API.
  254. Node Information
  255. ~~~~~~~~~~~~~~~~
  256. To get information about the current nodes in your cluster, you can use ``ray.nodes()``:
  257. .. autofunction:: ray.nodes
  258. :noindex:
  259. .. code-block:: python
  260. import ray
  261. ray.init()
  262. print(ray.nodes())
  263. """
  264. [{'NodeID': '2691a0c1aed6f45e262b2372baf58871734332d7',
  265. 'Alive': True,
  266. 'NodeManagerAddress': '192.168.1.82',
  267. 'NodeManagerHostname': 'host-MBP.attlocal.net',
  268. 'NodeManagerPort': 58472,
  269. 'ObjectManagerPort': 52383,
  270. 'ObjectStoreSocketName': '/tmp/ray/session_2020-08-04_11-00-17_114725_17883/sockets/plasma_store',
  271. 'RayletSocketName': '/tmp/ray/session_2020-08-04_11-00-17_114725_17883/sockets/raylet',
  272. 'MetricsExportPort': 64860,
  273. 'alive': True,
  274. 'Resources': {'CPU': 16.0, 'memory': 100.0, 'object_store_memory': 34.0, 'node:192.168.1.82': 1.0}}]
  275. """
  276. The above information includes:
  277. - `NodeID`: A unique identifier for the raylet.
  278. - `alive`: Whether the node is still alive.
  279. - `NodeManagerAddress`: PrivateIP of the node that the raylet is on.
  280. - `Resources`: The total resource capacity on the node.
  281. - `MetricsExportPort`: The port number at which metrics are exposed to through a `Prometheus endpoint <ray-metrics.html>`_.
  282. Resource Information
  283. ~~~~~~~~~~~~~~~~~~~~
  284. To get information about the current total resource capacity of your cluster, you can use ``ray.cluster_resources()``.
  285. .. autofunction:: ray.cluster_resources
  286. :noindex:
  287. To get information about the current available resource capacity of your cluster, you can use ``ray.available_resources()``.
  288. .. autofunction:: ray.available_resources
  289. :noindex:
  290. .. _conda-environments-for-tasks-and-actors:
  291. Conda Environments for Tasks and Actors
  292. -----------------------------------------
  293. Starting with Ray 1.3.0, Ray supports starting tasks and actors in `conda environments <https://docs.conda.io/en/latest/>`_.
  294. This allows you to use tasks and actors with different (possibly conflicting) package dependencies within a single Ray runtime.
  295. You will need to have the desired conda environments installed beforehand on all nodes in your Ray cluster, and they
  296. must all use the same Python minor version (e.g., Python 3.8).
  297. To start a specific task or an actor in an existing conda environment, pass in the environment name to your task or
  298. actor via the ``runtime_env`` parameter as follows:
  299. .. code-block:: python
  300. result = ray.get(my_task.options(runtime_env={"conda_env": "my_env"}).remote())
  301. .. code-block:: python
  302. my_actor = MyActor.options(runtime_env={"conda_env": "my_env"}).remote()
  303. Nested tasks and actors will inherit the conda environment of their parent by default.
  304. To have Ray start all tasks and actors in a specific conda environment by default, you may
  305. pass in the desired conda environment name into ``ray.init()``:
  306. .. code-block:: python
  307. from ray.job_config import JobConfig
  308. ray.init(job_config=JobConfig(runtime_env={"conda_env": "my_env"}))