123456789101112131415161718192021222324252627282930313233343536373839404142434445464748495051525354555657585960616263646566676869707172737475767778798081828384858687888990919293949596979899100101102103104105106107108109110111112113114115116117118119120121122123124125126127128129130131132133134135136137138139140141142143144145146147148149150151152153154155156157158159160161162163164165166167168169170171172173174175176177178179180181182183184185186187188189190191192193194195196197198199200201202203204205206207208209210211212213214215216217218219220221222223224225226227228229230231232233234235236237238239240241242243244245246247248249250251252253254255256257258259260261262263264265266267268269270271272273274275276277278279280281282283284285286287288289290291292293294295296297298299300301302303304305306307308309310311312313314315316317318319320321322323324325326327328329330331332333334335336337338339340341342343344345346347348349350351352353354355356357358359360361362363364365366367368369370371372373374375376377378379380381382383384385386387388389390391392393394395396397398399400401402403404405406407408409410411412413414415416417418419420421422423424425426427428429430431432433434435436437438439440441442443444445446447448449450451452453454455456457458459460461462463464465466467468469470471472473474475476477478479480481482483484485486487488489490491492493494495496497498499500501502503504505506507508509510511512513514515516517518519520521522523524525526527528529530531532533534535536537538539540541542543544545546547548549550551552553554555556557558559560561562563564565566567568569570571572573574575576577578579580581582583584585586587588589590591592593594595596597 |
- import json
- import os
- import time
- from pathlib import Path
- from typing import Dict, Tuple
- import click
- import numpy as np
- import torch
- from torch import nn, distributed
- from torch.utils.data import DataLoader, DistributedSampler
- from torch.utils.data.dataloader import default_collate
- from torchvision import datasets
- from torchvision.transforms import ToTensor
- CONFIG = {"lr": 1e-3, "batch_size": 64}
- VANILLA_RESULT_JSON = "/tmp/vanilla_out.json"
- def find_network_interface():
- for iface in os.listdir("/sys/class/net"):
- if iface.startswith("ens"):
- network_interface = iface
- break
- else:
- network_interface = "^lo,docker"
- return network_interface
- # Define model
- class NeuralNetwork(nn.Module):
- def __init__(self):
- super(NeuralNetwork, self).__init__()
- self.flatten = nn.Flatten()
- self.linear_relu_stack = nn.Sequential(
- nn.Linear(28 * 28, 512),
- nn.ReLU(),
- nn.Linear(512, 512),
- nn.ReLU(),
- nn.Linear(512, 10),
- nn.ReLU(),
- )
- def forward(self, x):
- x = self.flatten(x)
- logits = self.linear_relu_stack(x)
- return logits
- def train_epoch(
- dataloader, model, loss_fn, optimizer, world_size: int, local_rank: int
- ):
- size = len(dataloader.dataset) // world_size
- model.train()
- for batch, (X, y) in enumerate(dataloader):
- # Compute prediction error
- pred = model(X)
- loss = loss_fn(pred, y)
- # Backpropagation
- optimizer.zero_grad()
- loss.backward()
- optimizer.step()
- if batch % 100 == 0:
- loss, current = loss.item(), batch * len(X)
- print(f"[rank={local_rank}] loss: {loss:>7f} [{current:>5d}/{size:>5d}]")
- def validate_epoch(dataloader, model, loss_fn, world_size: int, local_rank: int):
- size = len(dataloader.dataset) // world_size
- num_batches = len(dataloader)
- model.eval()
- test_loss, correct = 0, 0
- with torch.no_grad():
- for X, y in dataloader:
- pred = model(X)
- test_loss += loss_fn(pred, y).item()
- correct += (pred.argmax(1) == y).type(torch.float).sum().item()
- test_loss /= num_batches
- correct /= size
- print(
- f"[rank={local_rank}] Test Error: \n "
- f"Accuracy: {(100 * correct):>0.1f}%, "
- f"Avg loss: {test_loss:>8f} \n"
- )
- return test_loss
- def train_func(use_ray: bool, config: Dict):
- local_start_time = time.monotonic()
- if use_ray:
- from ray.air import session
- import ray.train as train
- batch_size = config["batch_size"]
- lr = config["lr"]
- epochs = config["epochs"]
- shuffle = config.get("shuffle", False)
- if use_ray:
- world_size = session.get_world_size()
- local_rank = distributed.get_rank()
- else:
- world_size = distributed.get_world_size()
- local_rank = distributed.get_rank()
- worker_batch_size = batch_size // world_size
- # Load datasets. Use download=False to catch errors in preparation, as the
- # data should have already been downloaded.
- training_data = datasets.FashionMNIST(
- root="/tmp/data_fashion_mnist",
- train=True,
- download=False,
- transform=ToTensor(),
- )
- test_data = datasets.FashionMNIST(
- root="/tmp/data_fashion_mnist",
- train=False,
- download=False,
- transform=ToTensor(),
- )
- if use_ray:
- # Ray adds DistributedSampler in train.torch.prepare_data_loader below
- training_sampler = None
- test_sampler = None
- else:
- # In vanilla PyTorch we create the distributed sampler here
- training_sampler = DistributedSampler(training_data, shuffle=shuffle)
- test_sampler = DistributedSampler(test_data, shuffle=shuffle)
- if not use_ray and config.get("use_gpu", False):
- assert torch.cuda.is_available(), "No GPUs available"
- gpu_id = config.get("gpu_id", 0)
- vanilla_device = torch.device(f"cuda:{gpu_id}")
- torch.cuda.set_device(vanilla_device)
- print(
- "Setting GPU ID to",
- gpu_id,
- "with visible devices",
- os.environ.get("CUDA_VISIBLE_DEVICES"),
- )
- def collate_fn(x):
- return tuple(x_.to(vanilla_device) for x_ in default_collate(x))
- else:
- vanilla_device = torch.device("cpu")
- collate_fn = None
- # Create data loaders and potentially pass distributed sampler
- train_dataloader = DataLoader(
- training_data,
- shuffle=shuffle,
- batch_size=worker_batch_size,
- sampler=training_sampler,
- collate_fn=collate_fn,
- )
- test_dataloader = DataLoader(
- test_data,
- shuffle=shuffle,
- batch_size=worker_batch_size,
- sampler=test_sampler,
- collate_fn=collate_fn,
- )
- if use_ray:
- # In Ray, we now retrofit the DistributedSampler
- train_dataloader = train.torch.prepare_data_loader(train_dataloader)
- test_dataloader = train.torch.prepare_data_loader(test_dataloader)
- # Create model.
- model = NeuralNetwork()
- # Prepare model
- if use_ray:
- model = train.torch.prepare_model(model)
- else:
- model = model.to(vanilla_device)
- if config.get("use_gpu", False):
- model = nn.parallel.DistributedDataParallel(
- model, device_ids=[gpu_id], output_device=gpu_id
- )
- else:
- model = nn.parallel.DistributedDataParallel(model)
- loss_fn = nn.CrossEntropyLoss()
- optimizer = torch.optim.SGD(model.parameters(), lr=lr)
- for _ in range(epochs):
- train_epoch(
- train_dataloader,
- model,
- loss_fn,
- optimizer,
- world_size=world_size,
- local_rank=local_rank,
- )
- loss = validate_epoch(
- test_dataloader,
- model,
- loss_fn,
- world_size=world_size,
- local_rank=local_rank,
- )
- local_time_taken = time.monotonic() - local_start_time
- if use_ray:
- session.report(dict(loss=loss, local_time_taken=local_time_taken))
- else:
- print(f"Reporting loss: {loss:.4f}")
- if local_rank == 0:
- with open(VANILLA_RESULT_JSON, "w") as f:
- json.dump({"loss": loss, "local_time_taken": local_time_taken}, f)
- def train_torch_ray_air(
- *,
- config: dict,
- num_workers: int = 4,
- cpus_per_worker: int = 8,
- use_gpu: bool = False,
- ) -> Tuple[float, float, float]:
- # This function is kicked off by the main() function and runs a full training
- # run using Ray AIR.
- from ray.train.torch import TorchTrainer
- from ray.air.config import ScalingConfig
- def train_loop(config):
- train_func(use_ray=True, config=config)
- start_time = time.monotonic()
- trainer = TorchTrainer(
- train_loop_per_worker=train_loop,
- train_loop_config=config,
- scaling_config=ScalingConfig(
- trainer_resources={"CPU": 0},
- num_workers=num_workers,
- resources_per_worker={"CPU": cpus_per_worker},
- use_gpu=use_gpu,
- ),
- )
- result = trainer.fit()
- time_taken = time.monotonic() - start_time
- print(f"Last result: {result.metrics}")
- return time_taken, result.metrics["local_time_taken"], result.metrics["loss"]
- def train_torch_vanilla_worker(
- *,
- config: dict,
- rank: int,
- world_size: int,
- master_addr: str,
- master_port: int,
- use_gpu: bool = False,
- gpu_id: int = 0,
- ):
- # This function is kicked off by the main() function and runs the vanilla
- # training script on a single worker.
- backend = "nccl" if use_gpu else "gloo"
- os.environ["MASTER_ADDR"] = master_addr
- os.environ["MASTER_PORT"] = str(master_port)
- os.environ["NCCL_BLOCKING_WAIT"] = "1"
- distributed.init_process_group(
- backend=backend, rank=rank, world_size=world_size, init_method="env://"
- )
- config["use_gpu"] = use_gpu
- config["gpu_id"] = gpu_id
- train_func(use_ray=False, config=config)
- distributed.destroy_process_group()
- def train_torch_vanilla(
- *,
- config: dict,
- num_workers: int = 4,
- cpus_per_worker: int = 8,
- use_gpu: bool = False,
- ) -> Tuple[float, float, float]:
- # This function is kicked off by the main() function and subsequently kicks
- # off tasks that run train_torch_vanilla_worker() on the worker nodes.
- from benchmark_util import (
- upload_file_to_all_nodes,
- create_actors_with_options,
- run_commands_on_actors,
- run_fn_on_actors,
- get_ip_port_actors,
- get_gpu_ids_actors,
- map_ips_to_gpus,
- set_cuda_visible_devices,
- )
- path = os.path.abspath(__file__)
- upload_file_to_all_nodes(path)
- num_epochs = config["epochs"]
- try:
- nccl_network_interface = find_network_interface()
- runtime_env = {"env_vars": {"NCCL_SOCKET_IFNAME": nccl_network_interface}}
- except Exception:
- runtime_env = {}
- actors = create_actors_with_options(
- num_actors=num_workers,
- resources={
- "CPU": cpus_per_worker,
- "GPU": int(use_gpu),
- },
- runtime_env=runtime_env,
- )
- run_fn_on_actors(actors=actors, fn=lambda: os.environ.pop("OMP_NUM_THREADS", None))
- # Get IPs and ports for all actors
- ip_ports = get_ip_port_actors(actors=actors)
- # Rank 0 is the master addr/port
- master_addr, master_port = ip_ports[0]
- if use_gpu:
- # Extract IPs
- actor_ips = [ipp[0] for ipp in ip_ports]
- # Get allocated GPU IDs for all actors
- gpu_ids = get_gpu_ids_actors(actors=actors)
- # Build a map of IP to all allocated GPUs on that machine
- ip_to_gpu_map = map_ips_to_gpus(ips=actor_ips, gpus=gpu_ids)
- # Set the environment variables on the workers
- set_cuda_visible_devices(
- actors=actors, actor_ips=actor_ips, ip_to_gpus=ip_to_gpu_map
- )
- use_gpu_ids = [gi[0] for gi in gpu_ids]
- else:
- use_gpu_ids = [0] * num_workers
- cmds = [
- [
- "python",
- path,
- "worker",
- "--num-epochs",
- str(num_epochs),
- "--num-workers",
- str(num_workers),
- "--rank",
- str(rank),
- "--master-addr",
- master_addr,
- "--master-port",
- str(master_port),
- "--batch-size",
- str(config["batch_size"]),
- ]
- + (["--use-gpu"] if use_gpu else [])
- + (["--gpu-id", str(use_gpu_ids[rank])] if use_gpu else [])
- for rank in range(num_workers)
- ]
- run_fn_on_actors(
- actors=actors, fn=lambda: os.environ.setdefault("OMP_NUM_THREADS", "1")
- )
- start_time = time.monotonic()
- run_commands_on_actors(actors=actors, cmds=cmds)
- time_taken = time.monotonic() - start_time
- loss = 0.0
- if os.path.exists(VANILLA_RESULT_JSON):
- with open(VANILLA_RESULT_JSON, "r") as f:
- result = json.load(f)
- loss = result["loss"]
- local_time_taken = result["local_time_taken"]
- return time_taken, local_time_taken, loss
- @click.group(help="Run Torch benchmarks")
- def cli():
- pass
- @cli.command(help="Kick off Ray and vanilla benchmarks")
- @click.option("--num-runs", type=int, default=1)
- @click.option("--num-epochs", type=int, default=4)
- @click.option("--num-workers", type=int, default=4)
- @click.option("--cpus-per-worker", type=int, default=8)
- @click.option("--use-gpu", is_flag=True, default=False)
- @click.option("--batch-size", type=int, default=64)
- @click.option("--smoke-test", is_flag=True, default=False)
- @click.option("--local", is_flag=True, default=False)
- def run(
- num_runs: int = 1,
- num_epochs: int = 4,
- num_workers: int = 4,
- cpus_per_worker: int = 8,
- use_gpu: bool = False,
- batch_size: int = 64,
- smoke_test: bool = False,
- local: bool = False,
- ):
- # Note: smoke_test is ignored as we just adjust the batch size.
- # The parameter is passed by the release test pipeline.
- import ray
- from benchmark_util import upload_file_to_all_nodes, run_command_on_all_nodes
- config = CONFIG.copy()
- config["epochs"] = num_epochs
- config["batch_size"] = batch_size
- if local:
- ray.init(num_cpus=4)
- else:
- ray.init("auto")
- print("Preparing Torch benchmark: Downloading MNIST")
- path = str((Path(__file__).parent / "_torch_prepare.py").absolute())
- upload_file_to_all_nodes(path)
- run_command_on_all_nodes(["python", path])
- times_ray = []
- times_local_ray = []
- losses_ray = []
- times_vanilla = []
- times_local_vanilla = []
- losses_vanilla = []
- for run in range(1, num_runs + 1):
- time.sleep(2)
- print(f"[Run {run}/{num_runs}] Running Torch Ray benchmark")
- time_ray, time_local_ray, loss_ray = train_torch_ray_air(
- num_workers=num_workers,
- cpus_per_worker=cpus_per_worker,
- use_gpu=use_gpu,
- config=config,
- )
- print(
- f"[Run {run}/{num_runs}] Finished Ray training ({num_epochs} epochs) in "
- f"{time_ray:.2f} seconds (local training time: {time_local_ray:.2f}s). "
- f"Observed loss = {loss_ray:.4f}"
- )
- time.sleep(2)
- print(f"[Run {run}/{num_runs}] Running Torch vanilla benchmark")
- time_vanilla, time_local_vanilla, loss_vanilla = train_torch_vanilla(
- num_workers=num_workers,
- cpus_per_worker=cpus_per_worker,
- use_gpu=use_gpu,
- config=config,
- )
- print(
- f"[Run {run}/{num_runs}] Finished vanilla training ({num_epochs} epochs) "
- f"in {time_vanilla:.2f} seconds "
- f"(local training time: {time_local_vanilla:.2f}s). "
- f"Observed loss = {loss_vanilla:.4f}"
- )
- print(
- f"[Run {run}/{num_runs}] Observed results: ",
- {
- "tensorflow_mnist_ray_time_s": time_ray,
- "tensorflow_mnist_ray_local_time_s": time_local_ray,
- "tensorflow_mnist_ray_loss": loss_ray,
- "tensorflow_mnist_vanilla_time_s": time_vanilla,
- "tensorflow_mnist_vanilla_local_time_s": time_local_vanilla,
- "tensorflow_mnist_vanilla_loss": loss_vanilla,
- },
- )
- times_ray.append(time_ray)
- times_local_ray.append(time_local_ray)
- losses_ray.append(loss_ray)
- times_vanilla.append(time_vanilla)
- times_local_vanilla.append(time_local_vanilla)
- losses_vanilla.append(loss_vanilla)
- times_ray_mean = np.mean(times_ray)
- times_ray_sd = np.std(times_ray)
- times_local_ray_mean = np.mean(times_local_ray)
- times_local_ray_sd = np.std(times_local_ray)
- times_vanilla_mean = np.mean(times_vanilla)
- times_vanilla_sd = np.std(times_vanilla)
- times_local_vanilla_mean = np.mean(times_local_vanilla)
- times_local_vanilla_sd = np.std(times_local_vanilla)
- result = {
- "torch_mnist_ray_num_runs": num_runs,
- "torch_mnist_ray_time_s_all": times_ray,
- "torch_mnist_ray_time_s_mean": times_ray_mean,
- "torch_mnist_ray_time_s_sd": times_ray_sd,
- "torch_mnist_ray_time_local_s_all": times_local_ray,
- "torch_mnist_ray_time_local_s_mean": times_local_ray_mean,
- "torch_mnist_ray_time_local_s_sd": times_local_ray_sd,
- "torch_mnist_ray_loss_mean": np.mean(losses_ray),
- "torch_mnist_ray_loss_sd": np.std(losses_ray),
- "torch_mnist_vanilla_time_s_all": times_vanilla,
- "torch_mnist_vanilla_time_s_mean": times_vanilla_mean,
- "torch_mnist_vanilla_time_s_sd": times_vanilla_sd,
- "torch_mnist_vanilla_local_time_s_all": times_local_vanilla,
- "torch_mnist_vanilla_local_time_s_mean": times_local_vanilla_mean,
- "torch_mnist_vanilla_local_time_s_sd": times_local_vanilla_sd,
- "torch_mnist_vanilla_loss_mean": np.mean(losses_vanilla),
- "torch_mnist_vanilla_loss_std": np.std(losses_vanilla),
- }
- print("Results:", result)
- test_output_json = os.environ.get("TEST_OUTPUT_JSON", "/tmp/result.json")
- with open(test_output_json, "wt") as f:
- json.dump(result, f)
- target_ratio = 1.15
- ratio = (
- (times_local_ray_mean / times_local_vanilla_mean)
- if times_local_vanilla_mean != 0.0
- else 1.0
- )
- if ratio > target_ratio:
- raise RuntimeError(
- f"Training on Ray took an average of {times_local_ray_mean:.2f} seconds, "
- f"which is more than {target_ratio:.2f}x of the average vanilla training "
- f"time of {times_local_vanilla_mean:.2f} seconds ({ratio:.2f}x). FAILED"
- )
- print(
- f"Training on Ray took an average of {times_local_ray_mean:.2f} seconds, "
- f"which is less than {target_ratio:.2f}x of the average vanilla training "
- f"time of {times_local_vanilla_mean:.2f} seconds ({ratio:.2f}x). PASSED"
- )
- @cli.command(help="Run PyTorch vanilla worker")
- @click.option("--num-epochs", type=int, default=4)
- @click.option("--num-workers", type=int, default=4)
- @click.option("--rank", type=int, default=0)
- @click.option("--master-addr", type=str, default="")
- @click.option("--master-port", type=int, default=0)
- @click.option("--batch-size", type=int, default=64)
- @click.option("--use-gpu", is_flag=True, default=False)
- @click.option("--gpu-id", type=int, default=0)
- def worker(
- num_epochs: int = 4,
- num_workers: int = 4,
- rank: int = 0,
- master_addr: str = "",
- master_port: int = 0,
- batch_size: int = 64,
- use_gpu: bool = False,
- gpu_id: int = 0,
- ):
- config = CONFIG.copy()
- config["epochs"] = num_epochs
- config["batch_size"] = batch_size
- # Then we kick off the training function on every worker.
- return train_torch_vanilla_worker(
- config=config,
- rank=rank,
- world_size=num_workers,
- master_addr=master_addr,
- master_port=master_port,
- use_gpu=use_gpu,
- gpu_id=gpu_id,
- )
- def main():
- return cli()
- if __name__ == "__main__":
- main()
|