test_word_count.py 1.9 KB

1234567891011121314151617181920212223242526272829303132333435363738394041424344454647484950515253545556575859606162636465
  1. import os
  2. import sys
  3. import ray
  4. from ray.streaming import StreamingContext
  5. from ray._private.test_utils import wait_for_condition
  6. def test_word_count():
  7. ray.init(job_config=ray.job_config.JobConfig(code_search_path=sys.path))
  8. ctx = StreamingContext.Builder() \
  9. .build()
  10. ctx.read_text_file(__file__) \
  11. .set_parallelism(1) \
  12. .flat_map(lambda x: x.split()) \
  13. .map(lambda x: (x, 1)) \
  14. .key_by(lambda x: x[0]) \
  15. .reduce(lambda old_value, new_value:
  16. (old_value[0], old_value[1] + new_value[1])) \
  17. .filter(lambda x: "ray" not in x) \
  18. .sink(lambda x: print("result", x))
  19. ctx.submit("word_count")
  20. import time
  21. time.sleep(3)
  22. ray.shutdown()
  23. def test_simple_word_count():
  24. ray.init(job_config=ray.job_config.JobConfig(code_search_path=sys.path))
  25. ctx = StreamingContext.Builder() \
  26. .build()
  27. sink_file = "/tmp/ray_streaming_test_simple_word_count.txt"
  28. if os.path.exists(sink_file):
  29. os.remove(sink_file)
  30. def sink_func(x):
  31. with open(sink_file, "a") as f:
  32. line = "{}:{},".format(x[0], x[1])
  33. print("sink_func", line)
  34. f.write(line)
  35. ctx.from_values("a", "b", "c") \
  36. .set_parallelism(1) \
  37. .flat_map(lambda x: [x, x]) \
  38. .map(lambda x: (x, 1)) \
  39. .key_by(lambda x: x[0]) \
  40. .reduce(lambda old_value, new_value:
  41. (old_value[0], old_value[1] + new_value[1])) \
  42. .sink(sink_func)
  43. ctx.submit("word_count")
  44. def check_succeed():
  45. if os.path.exists(sink_file):
  46. with open(sink_file, "r") as f:
  47. result = f.read()
  48. return "a:2" in result and "b:2" in result and "c:2" in result
  49. return False
  50. wait_for_condition(check_succeed, timeout=60, retry_interval_ms=1000)
  51. print("Execution succeed")
  52. ray.shutdown()
  53. if __name__ == "__main__":
  54. test_word_count()
  55. test_simple_word_count()