datastream.py 20 KB

123456789101112131415161718192021222324252627282930313233343536373839404142434445464748495051525354555657585960616263646566676869707172737475767778798081828384858687888990919293949596979899100101102103104105106107108109110111112113114115116117118119120121122123124125126127128129130131132133134135136137138139140141142143144145146147148149150151152153154155156157158159160161162163164165166167168169170171172173174175176177178179180181182183184185186187188189190191192193194195196197198199200201202203204205206207208209210211212213214215216217218219220221222223224225226227228229230231232233234235236237238239240241242243244245246247248249250251252253254255256257258259260261262263264265266267268269270271272273274275276277278279280281282283284285286287288289290291292293294295296297298299300301302303304305306307308309310311312313314315316317318319320321322323324325326327328329330331332333334335336337338339340341342343344345346347348349350351352353354355356357358359360361362363364365366367368369370371372373374375376377378379380381382383384385386387388389390391392393394395396397398399400401402403404405406407408409410411412413414415416417418419420421422423424425426427428429430431432433434435436437438439440441442443444445446447448449450451452453454455456457458459460461462463464465466467468469470471472473474475476477478479480481482483484485486487488489490491492493494495496497498499500501502503504505506507508509510511512513514515516517518519520521522523524525526527528529530531532533534535536537538539540541542543544545546547548549550551552553554555556557558559560561562563564
  1. from abc import ABC, abstractmethod
  2. from ray.streaming import function
  3. from ray.streaming import partition
  4. class Stream(ABC):
  5. """
  6. Abstract base class of all stream types. A Stream represents a stream of
  7. elements of the same type. A Stream can be transformed into another Stream
  8. by applying a transformation.
  9. """
  10. def __init__(self, input_stream, j_stream, streaming_context=None):
  11. self.input_stream = input_stream
  12. self._j_stream = j_stream
  13. if streaming_context is None:
  14. assert input_stream is not None
  15. self.streaming_context = input_stream.streaming_context
  16. else:
  17. self.streaming_context = streaming_context
  18. def get_streaming_context(self):
  19. return self.streaming_context
  20. def get_parallelism(self):
  21. """
  22. Returns:
  23. the parallelism of this transformation
  24. """
  25. return self._gateway_client(). \
  26. call_method(self._j_stream, "getParallelism")
  27. def set_parallelism(self, parallelism: int):
  28. """Sets the parallelism of this transformation
  29. Args:
  30. parallelism: The new parallelism to set on this transformation
  31. Returns:
  32. self
  33. """
  34. self._gateway_client(). \
  35. call_method(self._j_stream, "setParallelism", parallelism)
  36. return self
  37. def get_input_stream(self):
  38. """
  39. Returns:
  40. input stream of this stream
  41. """
  42. return self.input_stream
  43. def get_id(self):
  44. """
  45. Returns:
  46. An unique id identifies this stream.
  47. """
  48. return self._gateway_client(). \
  49. call_method(self._j_stream, "getId")
  50. def with_config(self, key=None, value=None, conf=None):
  51. """Set stream config.
  52. Args:
  53. key: a key name string for configuration property
  54. value: a value string for configuration property
  55. conf: multi key-value pairs as a dict
  56. Returns:
  57. self
  58. """
  59. if key is not None:
  60. assert type(key) is str
  61. assert type(value) is str
  62. self._gateway_client(). \
  63. call_method(self._j_stream, "withConfig", key, value)
  64. if conf is not None:
  65. for k, v in conf.items():
  66. assert type(k) is str
  67. assert type(v) is str
  68. self._gateway_client(). \
  69. call_method(self._j_stream, "withConfig", conf)
  70. return self
  71. def get_config(self):
  72. """
  73. Returns:
  74. A dict config for this stream
  75. """
  76. return self._gateway_client().call_method(self._j_stream, "getConfig")
  77. @abstractmethod
  78. def get_language(self):
  79. pass
  80. def forward(self):
  81. """Set the partition function of this {@link Stream} so that output
  82. elements are forwarded to next operator locally."""
  83. self._gateway_client().call_method(self._j_stream, "forward")
  84. return self
  85. def disable_chain(self):
  86. """Disable chain for this stream so that it will be run in a separate
  87. task."""
  88. self._gateway_client().call_method(self._j_stream, "disableChain")
  89. return self
  90. def _gateway_client(self):
  91. return self.get_streaming_context()._gateway_client
  92. class DataStream(Stream):
  93. """
  94. Represents a stream of data which applies a transformation executed by
  95. python. It's also a wrapper of java
  96. `io.ray.streaming.python.stream.PythonDataStream`
  97. """
  98. def __init__(self, input_stream, j_stream, streaming_context=None):
  99. super().__init__(
  100. input_stream, j_stream, streaming_context=streaming_context)
  101. def get_language(self):
  102. return function.Language.PYTHON
  103. def map(self, func):
  104. """
  105. Applies a Map transformation on a :class:`DataStream`.
  106. The transformation calls a :class:`ray.streaming.function.MapFunction`
  107. for each element of the DataStream.
  108. Args:
  109. func: The MapFunction that is called for each element of the
  110. DataStream. If `func` is a python function instead of a subclass
  111. of MapFunction, it will be wrapped as SimpleMapFunction.
  112. Returns:
  113. A new data stream transformed by the MapFunction.
  114. """
  115. if not isinstance(func, function.MapFunction):
  116. func = function.SimpleMapFunction(func)
  117. j_func = self._gateway_client().create_py_func(
  118. function.serialize(func))
  119. j_stream = self._gateway_client(). \
  120. call_method(self._j_stream, "map", j_func)
  121. return DataStream(self, j_stream)
  122. def flat_map(self, func):
  123. """
  124. Applies a FlatMap transformation on a :class:`DataStream`. The
  125. transformation calls a :class:`ray.streaming.function.FlatMapFunction`
  126. for each element of the DataStream.
  127. Each FlatMapFunction call can return any number of elements including
  128. none.
  129. Args:
  130. func: The FlatMapFunction that is called for each element of the
  131. DataStream. If `func` is a python function instead of a subclass
  132. of FlatMapFunction, it will be wrapped as SimpleFlatMapFunction.
  133. Returns:
  134. The transformed DataStream
  135. """
  136. if not isinstance(func, function.FlatMapFunction):
  137. func = function.SimpleFlatMapFunction(func)
  138. j_func = self._gateway_client().create_py_func(
  139. function.serialize(func))
  140. j_stream = self._gateway_client(). \
  141. call_method(self._j_stream, "flatMap", j_func)
  142. return DataStream(self, j_stream)
  143. def filter(self, func):
  144. """
  145. Applies a Filter transformation on a :class:`DataStream`. The
  146. transformation calls a :class:`ray.streaming.function.FilterFunction`
  147. for each element of the DataStream.
  148. DataStream and retains only those element for which the function
  149. returns True.
  150. Args:
  151. func: The FilterFunction that is called for each element of the
  152. DataStream. If `func` is a python function instead of a subclass of
  153. FilterFunction, it will be wrapped as SimpleFilterFunction.
  154. Returns:
  155. The filtered DataStream
  156. """
  157. if not isinstance(func, function.FilterFunction):
  158. func = function.SimpleFilterFunction(func)
  159. j_func = self._gateway_client().create_py_func(
  160. function.serialize(func))
  161. j_stream = self._gateway_client(). \
  162. call_method(self._j_stream, "filter", j_func)
  163. return DataStream(self, j_stream)
  164. def union(self, *streams):
  165. """Apply union transformations to this stream by merging data stream
  166. outputs of the same type with each other.
  167. Args:
  168. *streams: The DataStreams to union output with.
  169. Returns:
  170. A new UnionStream.
  171. """
  172. assert len(streams) >= 1, "Need at least one stream to union with"
  173. j_streams = [s._j_stream for s in streams]
  174. j_stream = self._gateway_client().union(self._j_stream, *j_streams)
  175. return UnionStream(self, j_stream)
  176. def key_by(self, func):
  177. """
  178. Creates a new :class:`KeyDataStream` that uses the provided key to
  179. partition data stream by key.
  180. Args:
  181. func: The KeyFunction that is used for extracting the key for
  182. partitioning. If `func` is a python function instead of a subclass
  183. of KeyFunction, it will be wrapped as SimpleKeyFunction.
  184. Returns:
  185. A KeyDataStream
  186. """
  187. self._check_partition_call()
  188. if not isinstance(func, function.KeyFunction):
  189. func = function.SimpleKeyFunction(func)
  190. j_func = self._gateway_client().create_py_func(
  191. function.serialize(func))
  192. j_stream = self._gateway_client(). \
  193. call_method(self._j_stream, "keyBy", j_func)
  194. return KeyDataStream(self, j_stream)
  195. def broadcast(self):
  196. """
  197. Sets the partitioning of the :class:`DataStream` so that the output
  198. elements are broadcast to every parallel instance of the next
  199. operation.
  200. Returns:
  201. The DataStream with broadcast partitioning set.
  202. """
  203. self._check_partition_call()
  204. self._gateway_client().call_method(self._j_stream, "broadcast")
  205. return self
  206. def partition_by(self, partition_func):
  207. """
  208. Sets the partitioning of the :class:`DataStream` so that the elements
  209. of stream are partitioned by specified partition function.
  210. Args:
  211. partition_func: partition function.
  212. If `func` is a python function instead of a subclass of Partition,
  213. it will be wrapped as SimplePartition.
  214. Returns:
  215. The DataStream with specified partitioning set.
  216. """
  217. self._check_partition_call()
  218. if not isinstance(partition_func, partition.Partition):
  219. partition_func = partition.SimplePartition(partition_func)
  220. j_partition = self._gateway_client().create_py_func(
  221. partition.serialize(partition_func))
  222. self._gateway_client(). \
  223. call_method(self._j_stream, "partitionBy", j_partition)
  224. return self
  225. def _check_partition_call(self):
  226. """
  227. If parent stream is a java stream, we can't call partition related
  228. methods in the python stream
  229. """
  230. if self.input_stream is not None and \
  231. self.input_stream.get_language() == function.Language.JAVA:
  232. raise Exception("Partition related methods can't be called on a "
  233. "python stream if parent stream is a java stream.")
  234. def sink(self, func):
  235. """
  236. Create a StreamSink with the given sink.
  237. Args:
  238. func: sink function.
  239. Returns:
  240. a StreamSink.
  241. """
  242. if not isinstance(func, function.SinkFunction):
  243. func = function.SimpleSinkFunction(func)
  244. j_func = self._gateway_client().create_py_func(
  245. function.serialize(func))
  246. j_stream = self._gateway_client(). \
  247. call_method(self._j_stream, "sink", j_func)
  248. return StreamSink(self, j_stream, func)
  249. def as_java_stream(self):
  250. """
  251. Convert this stream as a java JavaDataStream.
  252. The converted stream and this stream are the same logical stream,
  253. which has same stream id. Changes in converted stream will be reflected
  254. in this stream and vice versa.
  255. """
  256. j_stream = self._gateway_client(). \
  257. call_method(self._j_stream, "asJavaStream")
  258. return JavaDataStream(self, j_stream)
  259. class JavaDataStream(Stream):
  260. """
  261. Represents a stream of data which applies a transformation executed by
  262. java. It's also a wrapper of java
  263. `io.ray.streaming.api.stream.DataStream`
  264. """
  265. def __init__(self, input_stream, j_stream, streaming_context=None):
  266. super().__init__(
  267. input_stream, j_stream, streaming_context=streaming_context)
  268. def get_language(self):
  269. return function.Language.JAVA
  270. def map(self, java_func_class):
  271. """See io.ray.streaming.api.stream.DataStream.map"""
  272. return JavaDataStream(self, self._unary_call("map", java_func_class))
  273. def flat_map(self, java_func_class):
  274. """See io.ray.streaming.api.stream.DataStream.flatMap"""
  275. return JavaDataStream(self, self._unary_call("flatMap",
  276. java_func_class))
  277. def filter(self, java_func_class):
  278. """See io.ray.streaming.api.stream.DataStream.filter"""
  279. return JavaDataStream(self, self._unary_call("filter",
  280. java_func_class))
  281. def union(self, *streams):
  282. """See io.ray.streaming.api.stream.DataStream.union"""
  283. assert len(streams) >= 1, "Need at least one stream to union with"
  284. j_streams = [s._j_stream for s in streams]
  285. j_stream = self._gateway_client().union(self._j_stream, *j_streams)
  286. return JavaUnionStream(self, j_stream)
  287. def key_by(self, java_func_class):
  288. """See io.ray.streaming.api.stream.DataStream.keyBy"""
  289. self._check_partition_call()
  290. return JavaKeyDataStream(self,
  291. self._unary_call("keyBy", java_func_class))
  292. def broadcast(self, java_func_class):
  293. """See io.ray.streaming.api.stream.DataStream.broadcast"""
  294. self._check_partition_call()
  295. return JavaDataStream(self,
  296. self._unary_call("broadcast", java_func_class))
  297. def partition_by(self, java_func_class):
  298. """See io.ray.streaming.api.stream.DataStream.partitionBy"""
  299. self._check_partition_call()
  300. return JavaDataStream(self,
  301. self._unary_call("partitionBy", java_func_class))
  302. def sink(self, java_func_class):
  303. """See io.ray.streaming.api.stream.DataStream.sink"""
  304. return JavaStreamSink(self, self._unary_call("sink", java_func_class))
  305. def as_python_stream(self):
  306. """
  307. Convert this stream as a python DataStream.
  308. The converted stream and this stream are the same logical stream,
  309. which has same stream id. Changes in converted stream will be reflected
  310. in this stream and vice versa.
  311. """
  312. j_stream = self._gateway_client(). \
  313. call_method(self._j_stream, "asPythonStream")
  314. return DataStream(self, j_stream)
  315. def _check_partition_call(self):
  316. """
  317. If parent stream is a python stream, we can't call partition related
  318. methods in the java stream
  319. """
  320. if self.input_stream is not None and \
  321. self.input_stream.get_language() == function.Language.PYTHON:
  322. raise Exception("Partition related methods can't be called on a"
  323. "java stream if parent stream is a python stream.")
  324. def _unary_call(self, func_name, java_func_class):
  325. j_func = self._gateway_client().new_instance(java_func_class)
  326. j_stream = self._gateway_client(). \
  327. call_method(self._j_stream, func_name, j_func)
  328. return j_stream
  329. class KeyDataStream(DataStream):
  330. """Represents a DataStream returned by a key-by operation.
  331. Wrapper of java io.ray.streaming.python.stream.PythonKeyDataStream
  332. """
  333. def __init__(self, input_stream, j_stream):
  334. super().__init__(input_stream, j_stream)
  335. def reduce(self, func):
  336. """
  337. Applies a reduce transformation on the grouped data stream grouped on
  338. by the given key function.
  339. The :class:`ray.streaming.function.ReduceFunction` will receive input
  340. values based on the key value. Only input values with the same key will
  341. go to the same reducer.
  342. Args:
  343. func: The ReduceFunction that will be called for every element of
  344. the input values with the same key. If `func` is a python function
  345. instead of a subclass of ReduceFunction, it will be wrapped as
  346. SimpleReduceFunction.
  347. Returns:
  348. A transformed DataStream.
  349. """
  350. if not isinstance(func, function.ReduceFunction):
  351. func = function.SimpleReduceFunction(func)
  352. j_func = self._gateway_client().create_py_func(
  353. function.serialize(func))
  354. j_stream = self._gateway_client(). \
  355. call_method(self._j_stream, "reduce", j_func)
  356. return DataStream(self, j_stream)
  357. def as_java_stream(self):
  358. """
  359. Convert this stream as a java KeyDataStream.
  360. The converted stream and this stream are the same logical stream,
  361. which has same stream id. Changes in converted stream will be reflected
  362. in this stream and vice versa.
  363. """
  364. j_stream = self._gateway_client(). \
  365. call_method(self._j_stream, "asJavaStream")
  366. return JavaKeyDataStream(self, j_stream)
  367. class JavaKeyDataStream(JavaDataStream):
  368. """
  369. Represents a DataStream returned by a key-by operation in java.
  370. Wrapper of io.ray.streaming.api.stream.KeyDataStream
  371. """
  372. def __init__(self, input_stream, j_stream):
  373. super().__init__(input_stream, j_stream)
  374. def reduce(self, java_func_class):
  375. """See io.ray.streaming.api.stream.KeyDataStream.reduce"""
  376. return JavaDataStream(self,
  377. super()._unary_call("reduce", java_func_class))
  378. def as_python_stream(self):
  379. """
  380. Convert this stream as a python KeyDataStream.
  381. The converted stream and this stream are the same logical stream,
  382. which has same stream id. Changes in converted stream will be reflected
  383. in this stream and vice versa.
  384. """
  385. j_stream = self._gateway_client(). \
  386. call_method(self._j_stream, "asPythonStream")
  387. return KeyDataStream(self, j_stream)
  388. class UnionStream(DataStream):
  389. """Represents a union stream.
  390. Wrapper of java io.ray.streaming.python.stream.PythonUnionStream
  391. """
  392. def __init__(self, input_stream, j_stream):
  393. super().__init__(input_stream, j_stream)
  394. def get_language(self):
  395. return function.Language.PYTHON
  396. class JavaUnionStream(JavaDataStream):
  397. """Represents a java union stream.
  398. Wrapper of java io.ray.streaming.api.stream.UnionStream
  399. """
  400. def __init__(self, input_stream, j_stream):
  401. super().__init__(input_stream, j_stream)
  402. def get_language(self):
  403. return function.Language.JAVA
  404. class StreamSource(DataStream):
  405. """Represents a source of the DataStream.
  406. Wrapper of java io.ray.streaming.python.stream.PythonStreamSource
  407. """
  408. def __init__(self, j_stream, streaming_context, source_func):
  409. super().__init__(None, j_stream, streaming_context=streaming_context)
  410. self.source_func = source_func
  411. def get_language(self):
  412. return function.Language.PYTHON
  413. @staticmethod
  414. def build_source(streaming_context, func):
  415. """Build a StreamSource source from a source function.
  416. Args:
  417. streaming_context: Stream context
  418. func: A instance of `SourceFunction`
  419. Returns:
  420. A StreamSource
  421. """
  422. j_stream = streaming_context._gateway_client. \
  423. create_py_stream_source(function.serialize(func))
  424. return StreamSource(j_stream, streaming_context, func)
  425. class JavaStreamSource(JavaDataStream):
  426. """Represents a source of the java DataStream.
  427. Wrapper of java io.ray.streaming.api.stream.DataStreamSource
  428. """
  429. def __init__(self, j_stream, streaming_context):
  430. super().__init__(None, j_stream, streaming_context=streaming_context)
  431. def get_language(self):
  432. return function.Language.JAVA
  433. @staticmethod
  434. def build_source(streaming_context, java_source_func_class):
  435. """Build a java StreamSource source from a java source function.
  436. Args:
  437. streaming_context: Stream context
  438. java_source_func_class: qualified class name of java SourceFunction
  439. Returns:
  440. A java StreamSource
  441. """
  442. j_func = streaming_context._gateway_client() \
  443. .new_instance(java_source_func_class)
  444. j_stream = streaming_context._gateway_client() \
  445. .call_function("io.ray.streaming.api.stream.DataStreamSource"
  446. "fromSource", streaming_context._j_ctx, j_func)
  447. return JavaStreamSource(j_stream, streaming_context)
  448. class StreamSink(Stream):
  449. """Represents a sink of the DataStream.
  450. Wrapper of java io.ray.streaming.python.stream.PythonStreamSink
  451. """
  452. def __init__(self, input_stream, j_stream, func):
  453. super().__init__(input_stream, j_stream)
  454. def get_language(self):
  455. return function.Language.PYTHON
  456. class JavaStreamSink(Stream):
  457. """Represents a sink of the java DataStream.
  458. Wrapper of java io.ray.streaming.api.stream.StreamSink
  459. """
  460. def __init__(self, input_stream, j_stream):
  461. super().__init__(input_stream, j_stream)
  462. def get_language(self):
  463. return function.Language.JAVA