README.rst 7.1 KB


  1. Ray Streaming
  2. =============
  3. Ray Streaming is a streaming data processing framework built on ray. It will be helpful for you to build jobs dealing with real-time data.
  4. Key Features
  5. ------------
  6. #.
  7. **Cross Language**. Based on Ray's multi-language actor, Ray Streaming can also run in multiple
  8. languages(only Python and Java is supported currently) with high efficiency. You can implement your
  9. operator in different languages and run them in one job.
  10. #.
  11. **Single Node Failover**. We designed a special failover mechanism that only needs to rollback the
  12. failed node it's own, in most cases, to recover the job. This will be a huge benefit if your job is
  13. sensitive about failure recovery time. In other frameworks like Flink, instead, the entire job should
  14. be restarted once a node has failure.
  15. Examples
  16. --------
  17. Python
  18. ^^^^^^
  19. .. code-block:: Python
  20. import ray
  21. from ray.streaming import StreamingContext
  22. ctx = StreamingContext.Builder() \
  23. .build()
  24. ctx.read_text_file(__file__) \
  25. .set_parallelism(1) \
  26. .flat_map(lambda x: x.split()) \
  27. .map(lambda x: (x, 1)) \
  28. .key_by(lambda x: x[0]) \
  29. .reduce(lambda old_value, new_value:
  30. (old_value[0], old_value[1] + new_value[1])) \
  31. .filter(lambda x: "ray" not in x) \
  32. .sink(lambda x: print("result", x))
  33. ctx.submit("word_count")
  34. Java
  35. ^^^^
  36. .. code-block:: Java
  37. StreamingContext context = StreamingContext.buildContext();
  38. List<String> text = Collections.singletonList("hello world");
  39. DataStreamSource.fromCollection(context, text)
  40. .flatMap((FlatMapFunction<String, WordAndCount>) (value, collector) -> {
  41. String[] records = value.split(" ");
  42. for (String record : records) {
  43. collector.collect(new WordAndCount(record, 1));
  44. }
  45. })
  46. .filter(pair -> !pair.word.contains("world"))
  47. .keyBy(pair -> pair.word)
  48. .reduce((oldValue, newValue) ->
  49. new WordAndCount(oldValue.word, oldValue.count + newValue.count))
  50. .sink(result -> System.out.println("sink result=" + result));
  51. context.execute("testWordCount");
  52. Use Java Operators in Python
  53. ^^^^^^^^^^^^^^^^^^^^^^^^^^^^
  54. .. code-block:: Python
  55. import ray
  56. from ray.streaming import StreamingContext
  57. ctx = StreamingContext.Builder().build()
  58. ctx.from_values("a", "b", "c") \
  59. .as_java_stream() \
  60. .map("io.ray.streaming.runtime.demo.HybridStreamTest$Mapper1") \
  61. .filter("io.ray.streaming.runtime.demo.HybridStreamTest$Filter1") \
  62. .as_python_stream() \
  63. .sink(lambda x: print("result", x))
  64. ctx.submit("HybridStreamTest")
  65. Use Python Operators in Java
  66. ^^^^^^^^^^^^^^^^^^^^^^^^^^^^
  67. .. code-block:: Java
  68. StreamingContext context = StreamingContext.buildContext();
  69. DataStreamSource<String> streamSource =
  70. DataStreamSource.fromCollection(context, Arrays.asList("a", "b", "c"));
  71. streamSource
  72. .map(x -> x + x)
  73. .asPythonStream()
  74. .map("ray.streaming.tests.test_hybrid_stream", "map_func1")
  75. .filter("ray.streaming.tests.test_hybrid_stream", "filter_func1")
  76. .asJavaStream()
  77. .sink(value -> System.out.println("HybridStream sink=" + value));
  78. context.execute("HybridStreamTestJob");
  79. Installation
  80. ------------
  81. Python
  82. ^^^^^^
  83. Ray Streaming is packaged together with Ray, install Ray with: ``pip install ray``\ ,
  84. this wheel contains all dependencies your need to run Python streaming, including Java operators supporting.
  85. Java
  86. ^^^^
  87. Import Ray Streaming using maven:
  88. .. code-block:: xml
  89. <dependency>
  90. <artifactId>ray-api</artifactId>
  91. <groupId>io.ray</groupId>
  92. <version>1.0.1</version>
  93. </dependency>
  94. <dependency>
  95. <artifactId>ray-runtime</artifactId>
  96. <groupId>io.ray</groupId>
  97. <version>1.0.1</version>
  98. </dependency>
  99. <dependency>
  100. <artifactId>streaming-api</artifactId>
  101. <groupId>io.ray</groupId>
  102. <version>1.0.1</version>
  103. </dependency>
  104. <dependency>
  105. <artifactId>streaming-runtime</artifactId>
  106. <groupId>io.ray</groupId>
  107. <version>1.0.1</version>
  108. </dependency>
  109. Internal Design
  110. ---------------
  111. Overall Architecture
  112. ^^^^^^^^^^^^^^^^^^^^
  113. .. image:: assets/architecture.jpg
  114. :target: assets/architecture.jpg
  115. :alt: architecture
  116. Ray Streaming is built on Ray. We use Ray's actor to run everything, and use Ray's direct call for communication.
  117. There are two main types of actor: job master and job worker.
  118. When you execute ``context.submit()`` in your driver, we'll first create a job master, then job master will create all job workers needed to run your operator. Then job master will be responsible to coordinate all workers, including checkpoint, failover, etc.
  119. Check `Ray Streaming Proposal <https://docs.google.com/document/d/1EubVMFSFJqNLmbNztnYKj6m0VMzg3a8ZVQZg-mgbLQ0>`_
  120. to get more detailed information about the overall design.
  121. Fault Tolerance Mechanism
  122. ^^^^^^^^^^^^^^^^^^^^^^^^^
  123. As mentioned above, different from other frameworks, We designed a special failover mechanism that only needs to rollback the failed node it's own, in most cases, to recover the job. The main idea to achieve this feature is saving messages for each node, and replay them from upstream when node has failure.
  124. Check `Fault Tolerance Proposal <https://docs.google.com/document/d/1NKjGr7fi-45cEzWA-N_wJ5CoUgaJfnsW9YeWsSg1shY>`_
  125. for more detailed information about our fault tolerance mechanism.
  126. Development Guides
  127. ------------------
  128. #.
  129. Build streaming java
  130. * build ray
  131. * ``bazel build //java:gen_maven_deps``
  132. * ``cd java && mvn clean install -Dmaven.test.skip=true && cd ..``
  133. * build streaming
  134. * ``bazel build //streaming/java:gen_maven_deps``
  135. * ``mvn clean install -Dmaven.test.skip=true``
  136. #.
  137. Build ray python will build ray streaming python.
  138. #.
  139. Run examples
  140. .. code-block:: bash
  141. # c++ test
  142. cd streaming/ && bazel test ...
  143. sh src/test/run_streaming_queue_test.sh
  144. cd ..
  145. # python test
  146. pushd python/ray/streaming/
  147. pushd examples
  148. python simple.py --input-file toy.txt
  149. popd
  150. pushd tests
  151. pytest .
  152. popd
  153. popd
  154. # java test
  155. cd streaming/java/streaming-runtime
  156. mvn test
  157. More Information
  158. ----------------
  159. * `Ray Streaming implementation plan <https://github.com/ray-project/ray/issues/6184>`_
  160. * `Fault Tolerance Proposal <https://docs.google.com/document/d/1NKjGr7fi-45cEzWA-N_wJ5CoUgaJfnsW9YeWsSg1shY>`_
  161. * `Data Transfer Proposal <https://docs.google.com/document/d/1cpGr40e9N8knmynqUnnrKhbNnz_6ucn5I2Koq2p4Xp8>`_
  162. * `Ray Streaming Proposal <https://docs.google.com/document/d/1EubVMFSFJqNLmbNztnYKj6m0VMzg3a8ZVQZg-mgbLQ0>`_
  163. * `Open Source Plan <https://docs.google.com/document/d/1fHFpPgXy853z0m--BZ_L1wmQADf2KXDrLQ1ebFlHLws>`_
  164. Getting Involved
  165. ----------------
  166. - `Community Slack`_: Join our Slack workspace.
  167. - `GitHub Discussions`_: For discussions about development, questions about usage, and feature requests.
  168. - `GitHub Issues`_: For reporting bugs.
  169. .. _`GitHub Discussions`: https://github.com/ray-project/ray/discussions
  170. .. _`GitHub Issues`: https://github.com/ray-project/ray/issues
  171. .. _`Community Slack`: https://forms.gle/9TSdDYUgxYs8SA9e8