12345678910111213141516171819202122232425262728293031323334353637383940414243444546474849505152535455565758596061626364656667686970717273747576777879808182 |
- import io
- import tarfile
- from typing import Optional
- from ray_release.file_manager.file_manager import FileManager
- def _pack(source_dir: str) -> bytes:
- stream = io.BytesIO()
- with tarfile.open(fileobj=stream, mode="w:gz", format=tarfile.PAX_FORMAT) as tar:
- tar.add(source_dir, arcname="")
- return stream.getvalue()
- def _unpack(stream: bytes, target_dir: str):
- with tarfile.open(fileobj=io.BytesIO(stream)) as tar:
- tar.extractall(target_dir)
- def send_dir_to_node(
- node_ip: str,
- local_dir: str,
- remote_dir: str,
- ):
- import ray
- try:
- packed = _pack(local_dir)
- ray.get(
- ray.remote(resources={f"node:{node_ip}": 0.01})(_unpack).remote(
- packed, remote_dir
- )
- )
- except Exception as e:
- print(
- f"Warning: Could not send remote directory contents. Message: " f"{str(e)}"
- )
- def fetch_dir_from_node(
- node_ip: str,
- remote_dir: str,
- local_dir: str,
- ):
- import ray
- try:
- packed = ray.get(
- ray.remote(resources={f"node:{node_ip}": 0.01})(_pack).remote(remote_dir)
- )
- _unpack(packed, local_dir)
- except Exception as e:
- print(f"Warning: Could not fetch remote directory contents. Message: {str(e)}")
- def _get_head_ip():
- import ray
- return ray.util.get_node_ip_address()
- def send_dir_to_head(local_dir: str, remote_dir: str):
- import ray
- ip = ray.get(ray.remote(_get_head_ip).remote())
- return send_dir_to_node(ip, local_dir, remote_dir)
- def fetch_dir_fom_head(local_dir: str, remote_dir: str):
- import ray
- ip = ray.get(ray.remote(_get_head_ip).remote())
- return fetch_dir_from_node(ip, remote_dir, local_dir)
- class RemoteTaskFileManager(FileManager):
- def upload(self, source: Optional[str] = None, target: Optional[str] = None):
- send_dir_to_head(source, target)
- def download(self, source: str, target: str):
- fetch_dir_fom_head(source, target)
|