123456789101112131415161718192021222324252627282930313233343536373839404142434445464748495051525354555657585960616263646566676869707172737475767778798081828384858687888990919293949596979899100101102103104105106107108109110111112113114115116117118119120121122123124125126127128129130131132133134135136137138139140141142143144145146147148149150151 |
- Streaming MapReduce
- ===================
- This document walks through how to implement a simple streaming application
- using Ray's actor capabilities. It implements a streaming MapReduce which
- computes word counts on wikipedia articles.
- You can view the `code for this example`_.
- .. _`code for this example`: https://github.com/ray-project/ray/tree/master/doc/examples/streaming
- To run the example, you need to install the dependencies
- .. code-block:: bash
- pip install wikipedia
- and then execute the script as follows:
- .. code-block:: bash
- python ray/doc/examples/streaming/streaming.py
- For each round of articles read, the script will output
- the top 10 words in these articles together with their word count:
- .. code-block:: text
- article index = 0
- the 2866
- of 1688
- and 1448
- in 1101
- to 593
- a 553
- is 509
- as 325
- are 284
- by 261
- article index = 1
- the 3597
- of 1971
- and 1735
- in 1429
- to 670
- a 623
- is 578
- as 401
- by 293
- for 285
- article index = 2
- the 3910
- of 2123
- and 1890
- in 1468
- to 658
- a 653
- is 488
- as 364
- by 362
- for 297
- article index = 3
- the 2962
- of 1667
- and 1472
- in 1220
- a 546
- to 538
- is 516
- as 307
- by 253
- for 243
- article index = 4
- the 3523
- of 1866
- and 1690
- in 1475
- to 645
- a 583
- is 572
- as 352
- by 318
- for 306
- ...
- Note that this examples uses `distributed actor handles`_, which are still
- considered experimental.
- .. _`distributed actor handles`: http://docs.ray.io/en/master/actors.html
- There is a ``Mapper`` actor, which has a method ``get_range`` used to retrieve
- word counts for words in a certain range:
- .. code-block:: python
- @ray.remote
- class Mapper(object):
- def __init__(self, title_stream):
- # Constructor, the title stream parameter is a stream of wikipedia
- # article titles that will be read by this mapper
- def get_range(self, article_index, keys):
- # Return counts of all the words with first
- # letter between keys[0] and keys[1] in the
- # articles that haven't been read yet with index
- # up to article_index
- The ``Reducer`` actor holds a list of mappers, calls ``get_range`` on them
- and accumulates the results.
- .. code-block:: python
- @ray.remote
- class Reducer(object):
- def __init__(self, keys, *mappers):
- # Constructor for a reducer that gets input from the list of mappers
- # in the argument and accumulates word counts for words with first
- # letter between keys[0] and keys[1]
- def next_reduce_result(self, article_index):
- # Get articles up to article_index that haven't been read yet,
- # accumulate the word counts and return them
- On the driver, we then create a number of mappers and reducers and run the
- streaming MapReduce:
- .. code-block:: python
- streams = # Create list of num_mappers streams
- keys = # Partition the keys among the reducers.
- # Create a number of mappers.
- mappers = [Mapper.remote(stream) for stream in streams]
- # Create a number of reduces, each responsible for a different range of keys.
- # This gives each Reducer actor a handle to each Mapper actor.
- reducers = [Reducer.remote(key, *mappers) for key in keys]
- article_index = 0
- while True:
- counts = ray.get([reducer.next_reduce_result.remote(article_index)
- for reducer in reducers])
- article_index += 1
- The actual example reads a list of articles and creates a stream object which
- produces an infinite stream of articles from the list. This is a toy example
- meant to illustrate the idea. In practice we would produce a stream of
- non-repeating items for each mapper.
|