test_env_with_subprocess.py 1.8 KB

1234567891011121314151617181920212223242526272829303132333435363738394041424344454647484950515253545556575859606162636465
  1. """Tests that envs clean up after themselves on agent exit."""
  2. import os
  3. import subprocess
  4. import tempfile
  5. import ray
  6. from ray.tune import run_experiments
  7. from ray.tune.registry import register_env
  8. from ray.rllib.examples.env.env_with_subprocess import EnvWithSubprocess
  9. from ray._private.test_utils import wait_for_condition
  10. def leaked_processes():
  11. """Returns whether any subprocesses were leaked."""
  12. result = subprocess.check_output(
  13. "ps aux | grep '{}' | grep -v grep || true".format(
  14. EnvWithSubprocess.UNIQUE_CMD),
  15. shell=True)
  16. return result
  17. if __name__ == "__main__":
  18. # Create 4 temp files, which the Env has to clean up after it's done.
  19. _, tmp1 = tempfile.mkstemp("test_env_with_subprocess")
  20. _, tmp2 = tempfile.mkstemp("test_env_with_subprocess")
  21. _, tmp3 = tempfile.mkstemp("test_env_with_subprocess")
  22. _, tmp4 = tempfile.mkstemp("test_env_with_subprocess")
  23. register_env("subproc", lambda c: EnvWithSubprocess(c))
  24. ray.init()
  25. # Check whether everything is ok.
  26. assert os.path.exists(tmp1)
  27. assert os.path.exists(tmp2)
  28. assert os.path.exists(tmp3)
  29. assert os.path.exists(tmp4)
  30. assert not leaked_processes()
  31. run_experiments({
  32. "demo": {
  33. "run": "PG",
  34. "env": "subproc",
  35. "num_samples": 1,
  36. "config": {
  37. "num_workers": 1,
  38. "env_config": {
  39. "tmp_file1": tmp1,
  40. "tmp_file2": tmp2,
  41. "tmp_file3": tmp3,
  42. "tmp_file4": tmp4,
  43. },
  44. "framework": "tf",
  45. },
  46. "stop": {
  47. "training_iteration": 1
  48. },
  49. },
  50. })
  51. ray.shutdown()
  52. # Check whether processes are still running.
  53. wait_for_condition(lambda: not leaked_processes(), timeout=30)
  54. print("OK")