run_replay.py 3.6 KB

123456789101112131415161718192021222324252627282930313233343536373839404142434445464748495051525354555657585960616263646566676869707172737475767778798081828384858687888990919293949596979899100101102
  1. import json
  2. import os
  3. import subprocess
  4. import yaml
  5. from argparse import ArgumentParser
  6. from sweagent.environment.utils import is_from_github_url
  7. from typing import Any, Dict, List
  8. def process_single_traj(traj_path: str, config_file: str, data_path: str, suffix: str):
  9. replay_action_trajs_path = "temp_replay.jsonl"
  10. # Open trajectory file, extract responses as actions
  11. if traj_path.endswith(".yaml"):
  12. traj_data = dict()
  13. with open(traj_path, "r") as f:
  14. traj_data["history"] = yaml.safe_load(f)
  15. else:
  16. traj_data = json.load(open(traj_path, "r"))
  17. actions = [x["content"] for x in traj_data["history"] if x["role"] == "assistant"]
  18. instance_id = traj_path.split("/")[-1].split(".")[0]
  19. with open(replay_action_trajs_path, "w") as f:
  20. print(
  21. json.dumps({instance_id: actions}),
  22. file=f,
  23. end="\n",
  24. flush=True
  25. )
  26. # Get data_path from args.yaml
  27. if data_path is None:
  28. args_path = os.path.join(
  29. os.path.dirname(traj_path),
  30. "args.yaml"
  31. )
  32. args = yaml.safe_load(open(args_path))
  33. data_path = args['environment']['data_path']
  34. # Identify the relevant task instance and create it
  35. def create_task_instances_tmp_file(data: List[Dict[str, Any]]) -> str:
  36. """Helper function to create a temporary file to write task instances to.
  37. Returns path to the temporary file.
  38. """
  39. data = [d for d in data if d["instance_id"] == instance_id]
  40. tmp_path = instance_id + ".jsonl"
  41. with open(tmp_path, "w") as f:
  42. for d in data:
  43. print(json.dumps(d), file=f, end="\n", flush=True)
  44. return replay_task_instances_path
  45. is_github = False
  46. if data_path.endswith(".jsonl"):
  47. replay_task_instances_path = create_task_instances_tmp_file([json.loads(x) for x in open(data_path, "r").readlines()])
  48. elif data_path.endswith(".json"):
  49. replay_task_instances_path = create_task_instances_tmp_file(json.load(open(data_path)))
  50. elif is_from_github_url(data_path):
  51. is_github = True
  52. replay_task_instances_path = data_path
  53. else:
  54. raise ValueError("--data_path must be a .json or .jsonl")
  55. # Call run.py via subprocess
  56. command = [
  57. "python",
  58. "run.py",
  59. "--config_file", config_file,
  60. "--data_path", replay_task_instances_path,
  61. "--install_environment", "True",
  62. "--model_name", "replay",
  63. "--replay_path", replay_action_trajs_path,
  64. ]
  65. if is_github:
  66. # Not sure if this only applies to github urls for data_path
  67. command.extend(["--skip_existing", "False"])
  68. if suffix is not None:
  69. command.extend(["--suffix", suffix])
  70. subprocess.run(command)
  71. os.remove(replay_action_trajs_path)
  72. try:
  73. os.remove(replay_task_instances_path)
  74. except FileNotFoundError:
  75. pass
  76. def main(
  77. traj_path: str,
  78. config_file: str,
  79. data_path: str,
  80. suffix: str,
  81. ):
  82. process_single_traj(traj_path, config_file, data_path, suffix)
  83. if __name__ == "__main__":
  84. parser = ArgumentParser()
  85. parser.add_argument("--traj_path", help="Path to trajectory to replay", default=None)
  86. parser.add_argument("--config_file", help="Path to template", required=True)
  87. parser.add_argument("--data_path", help="(Optional) Path to data file containing task instances ref'ed by replay trajectories", default=None)
  88. parser.add_argument("--suffix", help="(Optional) Suffix argument appended to end of traj path", default=None)
  89. args = parser.parse_args()
  90. main(**vars(args))