1234567891011121314151617181920212223242526272829303132333435363738394041424344454647484950515253545556575859606162636465 |
- """Tests that envs clean up after themselves on agent exit."""
- import os
- import subprocess
- import tempfile
- import ray
- from ray.tune import run_experiments
- from ray.tune.registry import register_env
- from ray.rllib.examples.env.env_with_subprocess import EnvWithSubprocess
- from ray._private.test_utils import wait_for_condition
- def leaked_processes():
- """Returns whether any subprocesses were leaked."""
- result = subprocess.check_output(
- "ps aux | grep '{}' | grep -v grep || true".format(
- EnvWithSubprocess.UNIQUE_CMD),
- shell=True)
- return result
- if __name__ == "__main__":
- # Create 4 temp files, which the Env has to clean up after it's done.
- _, tmp1 = tempfile.mkstemp("test_env_with_subprocess")
- _, tmp2 = tempfile.mkstemp("test_env_with_subprocess")
- _, tmp3 = tempfile.mkstemp("test_env_with_subprocess")
- _, tmp4 = tempfile.mkstemp("test_env_with_subprocess")
- register_env("subproc", lambda c: EnvWithSubprocess(c))
- ray.init()
- # Check whether everything is ok.
- assert os.path.exists(tmp1)
- assert os.path.exists(tmp2)
- assert os.path.exists(tmp3)
- assert os.path.exists(tmp4)
- assert not leaked_processes()
- run_experiments({
- "demo": {
- "run": "PG",
- "env": "subproc",
- "num_samples": 1,
- "config": {
- "num_workers": 1,
- "env_config": {
- "tmp_file1": tmp1,
- "tmp_file2": tmp2,
- "tmp_file3": tmp3,
- "tmp_file4": tmp4,
- },
- "framework": "tf",
- },
- "stop": {
- "training_iteration": 1
- },
- },
- })
- ray.shutdown()
- # Check whether processes are still running.
- wait_for_condition(lambda: not leaked_processes(), timeout=30)
- print("OK")
|