remote_task.py 2.0 KB

12345678910111213141516171819202122232425262728293031323334353637383940414243444546474849505152535455565758596061626364656667686970717273747576777879808182
  1. import io
  2. import tarfile
  3. from typing import Optional
  4. from ray_release.file_manager.file_manager import FileManager
  5. def _pack(source_dir: str) -> bytes:
  6. stream = io.BytesIO()
  7. with tarfile.open(fileobj=stream, mode="w:gz", format=tarfile.PAX_FORMAT) as tar:
  8. tar.add(source_dir, arcname="")
  9. return stream.getvalue()
  10. def _unpack(stream: bytes, target_dir: str):
  11. with tarfile.open(fileobj=io.BytesIO(stream)) as tar:
  12. tar.extractall(target_dir)
  13. def send_dir_to_node(
  14. node_ip: str,
  15. local_dir: str,
  16. remote_dir: str,
  17. ):
  18. import ray
  19. try:
  20. packed = _pack(local_dir)
  21. ray.get(
  22. ray.remote(resources={f"node:{node_ip}": 0.01})(_unpack).remote(
  23. packed, remote_dir
  24. )
  25. )
  26. except Exception as e:
  27. print(
  28. f"Warning: Could not send remote directory contents. Message: " f"{str(e)}"
  29. )
  30. def fetch_dir_from_node(
  31. node_ip: str,
  32. remote_dir: str,
  33. local_dir: str,
  34. ):
  35. import ray
  36. try:
  37. packed = ray.get(
  38. ray.remote(resources={f"node:{node_ip}": 0.01})(_pack).remote(remote_dir)
  39. )
  40. _unpack(packed, local_dir)
  41. except Exception as e:
  42. print(f"Warning: Could not fetch remote directory contents. Message: {str(e)}")
  43. def _get_head_ip():
  44. import ray
  45. return ray.util.get_node_ip_address()
  46. def send_dir_to_head(local_dir: str, remote_dir: str):
  47. import ray
  48. ip = ray.get(ray.remote(_get_head_ip).remote())
  49. return send_dir_to_node(ip, local_dir, remote_dir)
  50. def fetch_dir_fom_head(local_dir: str, remote_dir: str):
  51. import ray
  52. ip = ray.get(ray.remote(_get_head_ip).remote())
  53. return fetch_dir_from_node(ip, remote_dir, local_dir)
  54. class RemoteTaskFileManager(FileManager):
  55. def upload(self, source: Optional[str] = None, target: Optional[str] = None):
  56. send_dir_to_head(source, target)
  57. def download(self, source: str, target: str):
  58. fetch_dir_fom_head(source, target)