plot_streaming.rst 3.7 KB

123456789101112131415161718192021222324252627282930313233343536373839404142434445464748495051525354555657585960616263646566676869707172737475767778798081828384858687888990919293949596979899100101102103104105106107108109110111112113114115116117118119120121122123124125126127128129130131132133134135136137138139140141142143144145146147148149150151
  1. Streaming MapReduce
  2. ===================
  3. This document walks through how to implement a simple streaming application
  4. using Ray's actor capabilities. It implements a streaming MapReduce which
  5. computes word counts on wikipedia articles.
  6. You can view the `code for this example`_.
  7. .. _`code for this example`: https://github.com/ray-project/ray/tree/master/doc/examples/streaming
  8. To run the example, you need to install the dependencies
  9. .. code-block:: bash
  10. pip install wikipedia
  11. and then execute the script as follows:
  12. .. code-block:: bash
  13. python ray/doc/examples/streaming/streaming.py
  14. For each round of articles read, the script will output
  15. the top 10 words in these articles together with their word count:
  16. .. code-block:: text
  17. article index = 0
  18. the 2866
  19. of 1688
  20. and 1448
  21. in 1101
  22. to 593
  23. a 553
  24. is 509
  25. as 325
  26. are 284
  27. by 261
  28. article index = 1
  29. the 3597
  30. of 1971
  31. and 1735
  32. in 1429
  33. to 670
  34. a 623
  35. is 578
  36. as 401
  37. by 293
  38. for 285
  39. article index = 2
  40. the 3910
  41. of 2123
  42. and 1890
  43. in 1468
  44. to 658
  45. a 653
  46. is 488
  47. as 364
  48. by 362
  49. for 297
  50. article index = 3
  51. the 2962
  52. of 1667
  53. and 1472
  54. in 1220
  55. a 546
  56. to 538
  57. is 516
  58. as 307
  59. by 253
  60. for 243
  61. article index = 4
  62. the 3523
  63. of 1866
  64. and 1690
  65. in 1475
  66. to 645
  67. a 583
  68. is 572
  69. as 352
  70. by 318
  71. for 306
  72. ...
  73. Note that this examples uses `distributed actor handles`_, which are still
  74. considered experimental.
  75. .. _`distributed actor handles`: http://docs.ray.io/en/master/actors.html
  76. There is a ``Mapper`` actor, which has a method ``get_range`` used to retrieve
  77. word counts for words in a certain range:
  78. .. code-block:: python
  79. @ray.remote
  80. class Mapper(object):
  81. def __init__(self, title_stream):
  82. # Constructor, the title stream parameter is a stream of wikipedia
  83. # article titles that will be read by this mapper
  84. def get_range(self, article_index, keys):
  85. # Return counts of all the words with first
  86. # letter between keys[0] and keys[1] in the
  87. # articles that haven't been read yet with index
  88. # up to article_index
  89. The ``Reducer`` actor holds a list of mappers, calls ``get_range`` on them
  90. and accumulates the results.
  91. .. code-block:: python
  92. @ray.remote
  93. class Reducer(object):
  94. def __init__(self, keys, *mappers):
  95. # Constructor for a reducer that gets input from the list of mappers
  96. # in the argument and accumulates word counts for words with first
  97. # letter between keys[0] and keys[1]
  98. def next_reduce_result(self, article_index):
  99. # Get articles up to article_index that haven't been read yet,
  100. # accumulate the word counts and return them
  101. On the driver, we then create a number of mappers and reducers and run the
  102. streaming MapReduce:
  103. .. code-block:: python
  104. streams = # Create list of num_mappers streams
  105. keys = # Partition the keys among the reducers.
  106. # Create a number of mappers.
  107. mappers = [Mapper.remote(stream) for stream in streams]
  108. # Create a number of reduces, each responsible for a different range of keys.
  109. # This gives each Reducer actor a handle to each Mapper actor.
  110. reducers = [Reducer.remote(key, *mappers) for key in keys]
  111. article_index = 0
  112. while True:
  113. counts = ray.get([reducer.next_reduce_result.remote(article_index)
  114. for reducer in reducers])
  115. article_index += 1
  116. The actual example reads a list of articles and creates a stream object which
  117. produces an infinite stream of articles from the list. This is a toy example
  118. meant to illustrate the idea. In practice we would produce a stream of
  119. non-repeating items for each mapper.