123456789101112131415161718192021222324252627282930313233343536373839404142434445464748495051 |
- import sys
- import ray
- from ray.streaming import StreamingContext
- def test_data_stream():
- ray.init(job_config=ray.job_config.JobConfig(code_search_path=sys.path))
- ctx = StreamingContext.Builder().build()
- stream = ctx.from_values(1, 2, 3)
- java_stream = stream.as_java_stream()
- python_stream = java_stream.as_python_stream()
- assert stream.get_id() == java_stream.get_id()
- assert stream.get_id() == python_stream.get_id()
- python_stream.set_parallelism(10)
- assert stream.get_parallelism() == java_stream.get_parallelism()
- assert stream.get_parallelism() == python_stream.get_parallelism()
- ray.shutdown()
- def test_key_data_stream():
- ray.init(job_config=ray.job_config.JobConfig(code_search_path=sys.path))
- ctx = StreamingContext.Builder().build()
- key_stream = ctx.from_values(
- "a", "b", "c").map(lambda x: (x, 1)).key_by(lambda x: x[0])
- java_stream = key_stream.as_java_stream()
- python_stream = java_stream.as_python_stream()
- assert key_stream.get_id() == java_stream.get_id()
- assert key_stream.get_id() == python_stream.get_id()
- python_stream.set_parallelism(10)
- assert key_stream.get_parallelism() == java_stream.get_parallelism()
- assert key_stream.get_parallelism() == python_stream.get_parallelism()
- ray.shutdown()
- def test_stream_config():
- ray.init(job_config=ray.job_config.JobConfig(code_search_path=sys.path))
- ctx = StreamingContext.Builder().build()
- stream = ctx.from_values(1, 2, 3)
- stream.with_config("k1", "v1")
- print("config", stream.get_config())
- assert stream.get_config() == {"k1": "v1"}
- stream.with_config(conf={"k2": "v2", "k3": "v3"})
- print("config", stream.get_config())
- assert stream.get_config() == {"k1": "v1", "k2": "v2", "k3": "v3"}
- java_stream = stream.as_java_stream()
- java_stream.with_config(conf={"k4": "v4"})
- config = java_stream.get_config()
- print("config", config)
- assert config == {"k1": "v1", "k2": "v2", "k3": "v3", "k4": "v4"}
- ray.shutdown()
|