test_stream.py 2.0 KB

123456789101112131415161718192021222324252627282930313233343536373839404142434445464748495051
  1. import sys
  2. import ray
  3. from ray.streaming import StreamingContext
  4. def test_data_stream():
  5. ray.init(job_config=ray.job_config.JobConfig(code_search_path=sys.path))
  6. ctx = StreamingContext.Builder().build()
  7. stream = ctx.from_values(1, 2, 3)
  8. java_stream = stream.as_java_stream()
  9. python_stream = java_stream.as_python_stream()
  10. assert stream.get_id() == java_stream.get_id()
  11. assert stream.get_id() == python_stream.get_id()
  12. python_stream.set_parallelism(10)
  13. assert stream.get_parallelism() == java_stream.get_parallelism()
  14. assert stream.get_parallelism() == python_stream.get_parallelism()
  15. ray.shutdown()
  16. def test_key_data_stream():
  17. ray.init(job_config=ray.job_config.JobConfig(code_search_path=sys.path))
  18. ctx = StreamingContext.Builder().build()
  19. key_stream = ctx.from_values(
  20. "a", "b", "c").map(lambda x: (x, 1)).key_by(lambda x: x[0])
  21. java_stream = key_stream.as_java_stream()
  22. python_stream = java_stream.as_python_stream()
  23. assert key_stream.get_id() == java_stream.get_id()
  24. assert key_stream.get_id() == python_stream.get_id()
  25. python_stream.set_parallelism(10)
  26. assert key_stream.get_parallelism() == java_stream.get_parallelism()
  27. assert key_stream.get_parallelism() == python_stream.get_parallelism()
  28. ray.shutdown()
  29. def test_stream_config():
  30. ray.init(job_config=ray.job_config.JobConfig(code_search_path=sys.path))
  31. ctx = StreamingContext.Builder().build()
  32. stream = ctx.from_values(1, 2, 3)
  33. stream.with_config("k1", "v1")
  34. print("config", stream.get_config())
  35. assert stream.get_config() == {"k1": "v1"}
  36. stream.with_config(conf={"k2": "v2", "k3": "v3"})
  37. print("config", stream.get_config())
  38. assert stream.get_config() == {"k1": "v1", "k2": "v2", "k3": "v3"}
  39. java_stream = stream.as_java_stream()
  40. java_stream.with_config(conf={"k4": "v4"})
  41. config = java_stream.get_config()
  42. print("config", config)
  43. assert config == {"k1": "v1", "k2": "v2", "k3": "v3", "k4": "v4"}
  44. ray.shutdown()