{ "cells": [ { "attachments": {}, "cell_type": "markdown", "metadata": {}, "source": [ "# Scaling Batch Inference with Ray Data\n", "\n", "| Template Specification | Description |\n", "| ---------------------- | ----------- |\n", "| Summary | This template walks through GPU batch inference on an image dataset. |\n", "| Time to Run | Less than 5 minutes to compute predictions on the dataset. |\n", "| Minimum Compute Requirements | No hard requirements. The default is 4 nodes, each with 1 NVIDIA T4 GPU. |\n", "| Cluster Environment | This template uses the latest Anyscale-provided Ray ML image using Python 3.9: [`anyscale/ray-ml:latest-py39-gpu`](https://docs.anyscale.com/reference/base-images/overview). If you want to change to a different cluster environment, make sure that it is based off of this image! |\n" ] }, { "attachments": {}, "cell_type": "markdown", "metadata": {}, "source": [ "In this example, we will introduce how to use the [Ray Data](https://docs.ray.io/en/latest/data/data.html) for **large-scale image classification batch inference with multiple GPU workers.**\n", "\n", "In particular, we will:\n", "- Load Imagenette dataset from S3 bucket and create a [Ray `Dataset`](https://docs.ray.io/en/latest/data/api/doc/ray.data.Dataset.html).\n", "- Load a pretrained ResNet model.\n", "- Use [Ray Data](https://docs.ray.io/en/latest/data/data.html) to preprocess the dataset and do model inference parallelizing across multiple GPUs\n", "- Evaluate the predictions and save results to S3/local disk.\n", "\n", "This example will still work even if you do not have GPUs available, but overall performance will be slower." ] }, { "attachments": {}, "cell_type": "markdown", "metadata": {}, "source": [ "The default cluster environment used by this template already has all the dependencies\n", "needed to run. If you're using a custom cluster environment, you'll need to install\n", "`torch` and `torchvision` and restart the notebook." ] }, { "cell_type": "code", "execution_count": null, "metadata": { "tags": [] }, "outputs": [], "source": [ "!pip install torch torchvision" ] }, { "attachments": {}, "cell_type": "markdown", "metadata": {}, "source": [ "## Step 1: Reading the Dataset from S3" ] }, { "attachments": {}, "cell_type": "markdown", "metadata": {}, "source": [ "[Imagenette](https://github.com/fastai/imagenette) is a subset of Imagenet with 10 classes. We have this dataset hosted publicly in an S3 bucket. Since we are only doing inference here, we load in just the validation split.\n", "\n", "Here, we use [`ray.data.read_images`](https://docs.ray.io/en/latest/data/api/doc/ray.data.read_images.html) to load the validation set from S3. Ray Data also supports reading from a variety of other [datasources and formats](https://docs.ray.io/en/latest/data/loading-data.html)." ] }, { "cell_type": "code", "execution_count": null, "metadata": { "colab": { "referenced_widgets": [ "217255c5a2ba4ec5890f6f3667f5b429" ] }, "id": "6i15qjnH0hin", "outputId": "c22aaba0-b33a-40f5-cf89-a70847098af2", "tags": [] }, "outputs": [], "source": [ "import ray\n", "\n", "s3_uri = \"s3://anonymous@air-example-data-2/imagenette2/train/\"\n", "\n", "ds = ray.data.read_images(s3_uri, mode=\"RGB\")\n", "ds\n" ] }, { "attachments": {}, "cell_type": "markdown", "metadata": {}, "source": [ "Inspecting the schema, we can see that there is 1 column in the dataset containing the images stored as Numpy arrays." ] }, { "cell_type": "code", "execution_count": null, "metadata": { "tags": [] }, "outputs": [], "source": [ "ds.schema()\n" ] }, { "attachments": {}, "cell_type": "markdown", "metadata": {}, "source": [ "## Step 2: Inference on a single batch" ] }, { "attachments": {}, "cell_type": "markdown", "metadata": {}, "source": [ "Next, we can do inference on a single batch of data, using a pre-trained ResNet152 model and following [this PyTorch example](https://pytorch.org/vision/main/models.html#classification).\n", "\n", "Let’s get a batch of 10 from our dataset. Each image in the batch is represented as a Numpy array." ] }, { "cell_type": "code", "execution_count": null, "metadata": { "tags": [] }, "outputs": [], "source": [ "single_batch = ds.take_batch(10)\n" ] }, { "attachments": {}, "cell_type": "markdown", "metadata": {}, "source": [ "We can visualize 1 image from this batch." ] }, { "cell_type": "code", "execution_count": null, "metadata": { "tags": [] }, "outputs": [], "source": [ "from PIL import Image\n", "\n", "img = Image.fromarray(single_batch[\"image\"][0])\n", "img\n" ] }, { "attachments": {}, "cell_type": "markdown", "metadata": {}, "source": [ "Now, let’s download a pre-trained PyTorch Resnet model and get the required preprocessing transforms to preprocess the images prior to prediction." ] }, { "cell_type": "code", "execution_count": null, "metadata": { "tags": [] }, "outputs": [], "source": [ "from torchvision.models import ResNet152_Weights\n", "from torchvision import transforms\n", "from torchvision import models\n", "\n", "weights = ResNet152_Weights.IMAGENET1K_V1\n", "\n", "# Load the pretrained resnet model and move to GPU.\n", "# Remove the `.cuda()` if using CPU.\n", "model = models.resnet152(weights=weights).cuda()\n", "model.eval()\n", "\n", "imagenet_transforms = weights.transforms\n", "transform = transforms.Compose([transforms.ToTensor(), imagenet_transforms()])\n" ] }, { "attachments": {}, "cell_type": "markdown", "metadata": {}, "source": [ "Then, we apply the transforms to our batch of images, and pass the batch to the model for inference, making sure to use the GPU device for inference.\n", "\n", "All of the images in the batch have been correctly classified as \"tench\" which is a type of fish." ] }, { "cell_type": "code", "execution_count": null, "metadata": { "tags": [] }, "outputs": [], "source": [ "import torch\n", "\n", "transformed_batch = [transform(image) for image in single_batch[\"image\"]]\n", "with torch.inference_mode():\n", " # Remove the `.cuda()` if doing inference on CPUs.\n", " prediction_results = model(torch.stack(transformed_batch).cuda())\n", " classes = prediction_results.argmax(dim=1).cpu()\n", "\n", "del model # Free up GPU memory\n", "\n", "labels = [weights.meta[\"categories\"][i] for i in classes]\n", "labels\n" ] }, { "attachments": {}, "cell_type": "markdown", "metadata": {}, "source": [ "## Step 3: Scaling up to the full Dataset with Ray Data" ] }, { "attachments": {}, "cell_type": "markdown", "metadata": {}, "source": [ "By using Ray Data, we can apply the same logic in the previous section to scale up to the entire dataset, leveraging all the GPUs in our cluster." ] }, { "attachments": {}, "cell_type": "markdown", "metadata": {}, "source": [ "### Preprocessing" ] }, { "attachments": {}, "cell_type": "markdown", "metadata": {}, "source": [ "First let's convert the preprocessing code to Ray Data. We'll package the preprocessing code within a `preprocess_image` function. This function should take only one argument, which is a dict that contains a single image in the dataset, represented as a numpy array. We use the same `transform` function that was defined above and store the transformed image in a new `transformed_image` field." ] }, { "cell_type": "code", "execution_count": null, "metadata": { "tags": [] }, "outputs": [], "source": [ "from typing import Any, Dict\n", "\n", "\n", "def preprocess_image(row: Dict[str, Any]):\n", " return {\n", " \"original_image\": row[\"image\"],\n", " \"transformed_image\": transform(row[\"image\"]),\n", " }\n" ] }, { "attachments": {}, "cell_type": "markdown", "metadata": {}, "source": [ "Then we use the [`map`](https://docs.ray.io/en/latest/data/api/doc/ray.data.Dataset.map.html) API to apply the preprocessing to the whole dataset. By using Ray Data's map, we can scale out the preprocessing to all the resources in our Ray cluster.\n", "\n", "Note: the `map` method is lazy, it won't perform execution until we consume the results." ] }, { "cell_type": "code", "execution_count": null, "metadata": { "tags": [] }, "outputs": [], "source": [ "transformed_ds = ds.map(preprocess_image)\n" ] }, { "attachments": {}, "cell_type": "markdown", "metadata": {}, "source": [ "### Model Inference" ] }, { "attachments": {}, "cell_type": "markdown", "metadata": {}, "source": [ "Next, let's convert the model inference part. Compared with preprocessing, model inference has 2 differences:\n", "1. Model loading and initialization is usually expensive. \n", "2. Model inference can be optimized with hardware acceleration if we process data in batches. Using larger batches improves GPU utilization and the overall runtime of the inference job.\n", "\n", "Thus, we convert the model inference code to the following `ResnetModel` class. In this class, we put the expensive model loading and initialization code in the `__init__` constructor, which will run only once. And we put the model inference code in the `__call__` method, which will be called for each batch.\n", "\n", "The `__call__` method takes a batch of data items, instead of a single one. In this case, the batch is also a dict that has one key named \"image\", and the value is a Numpy array of images represented in `np.ndarray` format. We reuse the same inferencing logic from step 2." ] }, { "cell_type": "code", "execution_count": null, "metadata": { "tags": [] }, "outputs": [], "source": [ "from typing import Dict\n", "import numpy as np\n", "import torch\n", "\n", "\n", "class ResnetModel:\n", " def __init__(self):\n", " self.weights = ResNet152_Weights.IMAGENET1K_V1\n", " self.model = models.resnet152(weights=self.weights).cuda()\n", " self.model.eval()\n", "\n", " def __call__(self, batch: Dict[str, np.ndarray]):\n", " # Convert the numpy array of images into a PyTorch tensor.\n", " torch_batch = torch.from_numpy(batch[\"transformed_image\"])\n", " # Move the tensor batch to GPU if available.\n", " if torch.cuda.is_available():\n", " torch_batch = torch_batch.cuda()\n", " with torch.inference_mode():\n", " prediction = self.model(torch_batch)\n", " predicted_classes = prediction.argmax(dim=1).detach().cpu()\n", " predicted_labels = [\n", " self.weights.meta[\"categories\"][i] for i in predicted_classes\n", " ]\n", " return {\n", " \"predicted_label\": predicted_labels,\n", " \"original_image\": batch[\"original_image\"],\n", " }\n" ] }, { "attachments": {}, "cell_type": "markdown", "metadata": {}, "source": [ "Then we use the [`map_batches`](https://docs.ray.io/en/latest/data/api/doc/ray.data.Dataset.map_batches.html) API to apply the model to the whole dataset.\n", "\n", "The first parameter of `map_batches` is the user-defined function (UDF), which can either be a function or a class. Since we are using a class in this case, the UDF will run as long-running [Ray actors](https://docs.ray.io/en/latest/ray-core/actors.html). For class-based UDFs, we use the `compute` argument to specify [`ActorPoolStrategy`](https://docs.ray.io/en/latest/data/api/doc/ray.data.ActorPoolStrategy.html) with the number of parallel actors.\n", "\n", "The `batch_size` argument indicates the number of images in each batch. See the Ray dashboard\n", "for GPU memory usage to experiment with the `batch_size` when using your own model and dataset.\n", "\n", "The `num_gpus` argument specifies the number of GPUs needed for each `ResnetModel` instance. In this case, we want 1 GPU for each model replica. If you are doing CPU inference, you can remove the `num_gpus=1`." ] }, { "cell_type": "code", "execution_count": null, "metadata": { "tags": [] }, "outputs": [], "source": [ "predictions = transformed_ds.map_batches(\n", " ResnetModel,\n", " compute=ray.data.ActorPoolStrategy(\n", " size=4\n", " ), # Use 4 GPUs. Change this number based on the number of GPUs in your cluster.\n", " num_gpus=1, # Specify 1 GPU per model replica.\n", " batch_size=720, # Use the largest batch size that can fit on our GPUs\n", ")\n" ] }, { "attachments": {}, "cell_type": "markdown", "metadata": {}, "source": [ "### Verify and Save Results" ] }, { "attachments": {}, "cell_type": "markdown", "metadata": {}, "source": [ "Let's take a small batch of predictions and verify the results." ] }, { "cell_type": "code", "execution_count": null, "metadata": { "tags": [] }, "outputs": [], "source": [ "prediction_batch = predictions.take_batch(5)" ] }, { "attachments": {}, "cell_type": "markdown", "metadata": {}, "source": [ "We see that all the images are correctly classified as \"tench\", which is a type of fish." ] }, { "cell_type": "code", "execution_count": null, "metadata": { "tags": [] }, "outputs": [], "source": [ "from PIL import Image\n", "\n", "for image, prediction in zip(\n", " prediction_batch[\"original_image\"], prediction_batch[\"predicted_label\"]\n", "):\n", " img = Image.fromarray(image)\n", " display(img)\n", " print(\"Label: \", prediction)\n" ] }, { "attachments": {}, "cell_type": "markdown", "metadata": { "tags": [] }, "source": [ "If the samples look good, we can proceed with saving the results to an external storage, e.g., S3 or local disks. See [the guide on saving data](https://docs.ray.io/en/latest/data/saving-data.html) for all supported storage and file formats.\n" ] }, { "cell_type": "code", "execution_count": null, "metadata": {}, "outputs": [], "source": [ "import tempfile\n", "\n", "temp_dir = tempfile.mkdtemp()\n", "\n", "# Don't save the images as part of the predictions.\n", "predictions = predictions.drop_columns([\"original_image\"])\n", "\n", "# The `local://` prefix is need to make sure all results get written on the head node.\n", "predictions.write_parquet(f\"local://{temp_dir}\")\n", "print(f\"Predictions saved to `{temp_dir}`!\")\n" ] } ], "metadata": { "colab": { "provenance": [] }, "kernelspec": { "display_name": "Python 3 (ipykernel)", "language": "python", "name": "python3" }, "language_info": { "codemirror_mode": { "name": "ipython", "version": 3 }, "file_extension": ".py", "mimetype": "text/x-python", "name": "python", "nbconvert_exporter": "python", "pygments_lexer": "ipython3", "version": "3.10.8" }, "vscode": { "interpreter": { "hash": "a8c1140d108077f4faeb76b2438f85e4ed675f93d004359552883616a1acd54c" } } }, "nbformat": 4, "nbformat_minor": 4 }