Skip to main content

Scale Model Training and Tuning to GPU

Question

How do I scale model training and hyperparameter tuning to GPUs with Metaflow?

Solution

To scale horizontally, you can structure Metaflow flows with branching and looping patterns. To scale vertically you can access more processor and memory resources for a task in your flow by using Metaflow's @batch and @kubernetes decorators.

1Define PyTorch Dependencies

This example will show how to tune a PyTorch model on GPUs. The script contains functions to

  • Load data.
  • Instantiate a neural net.
  • Train and evaluate a neural net.

The original code for the example comes from the PyTorch documentation.

torch_steps.py
import torch
import torchvision
import torch.optim as optim
import torchvision.transforms as transforms
import torch.nn as nn
import torch.nn.functional as F

class Net(nn.Module):

def __init__(self):
super().__init__()
self.conv1 = nn.Conv2d(3, 6, 5)
self.pool = nn.MaxPool2d(2, 2)
self.conv2 = nn.Conv2d(6, 16, 5)
self.fc1 = nn.Linear(16 * 5 * 5, 120)
self.fc2 = nn.Linear(120, 84)
self.fc3 = nn.Linear(84, 10)

def forward(self, x):
x = self.pool(F.relu(self.conv1(x)))
x = self.pool(F.relu(self.conv2(x)))
# flatten all dims except batch
x = torch.flatten(x, 1)
x = F.relu(self.fc1(x))
x = F.relu(self.fc2(x))
x = self.fc3(x)
return x

def load_data():
transform = transforms.Compose(
[transforms.ToTensor(),
transforms.Normalize((0.5, 0.5, 0.5), (0.5, 0.5, 0.5))])
batch_size = 4
trainset = torchvision.datasets.CIFAR10(
root='./data',
train=True,
download=True,
transform=transform
)
trainloader = torch.utils.data.DataLoader(
trainset,
batch_size=batch_size,
shuffle=True,
num_workers=2
)
testset = torchvision.datasets.CIFAR10(
root='./data',
train=False,
download=True,
transform=transform
)
testloader = torch.utils.data.DataLoader(
testset,
batch_size=batch_size,
shuffle=False,
num_workers=2
)
classes = ('plane', 'car', 'bird', 'cat','deer',
'dog', 'frog', 'horse', 'ship', 'truck')
return trainloader, testloader, classes

def train_model(trainloader, lr, epochs=1):
net = Net()

if torch.cuda.is_available():
device = torch.device('cuda:0')
else:
device = torch.device('cpu')

net.to(device)
criterion = nn.CrossEntropyLoss()
optimizer = optim.SGD(net.parameters(),
lr=lr, momentum=0.9)
for epoch in range(epochs):
for i, data in enumerate(trainloader, 0):
inputs = data[0].to(device)
labels = data[1].to(device)
optimizer.zero_grad()
outputs = net(inputs)
loss = criterion(outputs, labels)
loss.backward()
optimizer.step()
return net

def run_inference_and_tests(net, testloader):
correct = 0
total = 0
with torch.no_grad():
for data in testloader:
images, labels = data
outputs = net(images)
_, predicted = torch.max(outputs.data, 1)
total += labels.size(0)
correct += (predicted == labels).sum().item()
return 100 * correct // total

2Run Flow

This flow leverages the functions in torch_steps.py to:

  • Create dataloaders for training and testing.
  • Train multiple models in parallel on GPU instances using Metaflow's @batch decorator in the train step.
    • In this case each model will get a different learning rate assigned. This is simple for demo purposes, in practice you will want to explore more dimensions of hyperparameter space when you have the resources.
  • Evaluate each model.
  • Join the results of evaluation to select the best model.
  • Print the highest accuracy model.
    • You can improve the model by changing hyperparameters in torch_steps.py or by expanding the hyperparameter tuning task with more branches in the flow.

The example uses Metaflow's @conda decorator to create a similar environment across local and remote compute.

scale_tuning_gpu.py
from metaflow import (FlowSpec, step, Parameter, 
batch, JSONType, conda_base)
import json
import torch_steps

@conda_base(libraries={"pytorch":"1.11.0",
"torchvision":"0.12.0"},
python="3.8")
class GPUFlow(FlowSpec):

learning_rates = Parameter(
'learning-rates',
default=json.dumps([0.01,0.001]),
type=JSONType
)

@step
def start(self):
data = torch_steps.load_data()
self.trainloader = data[0]
self.testloader = data[1]
self.classes = data[2]
self.next(self.train, foreach='learning_rates')


@batch(gpu=1)
@step
def train(self):
self.model = torch_steps.train_model(
self.trainloader,
lr=self.input
)
self.next(self.evaluate_model)


@step
def evaluate_model(self):
result = torch_steps.run_inference_and_tests(
self.model,
self.testloader
)
self.accuracy = result
self.next(self.join)


@step
def join(self, inputs):
best_model = None; best_score = -1
for i in inputs:
if i.accuracy > best_score:
best_score = i.accuracy
best_model = i.model
self.best_model = best_model
self.best_score = best_score
print(f"Best model accuracy was {best_score}%.")
self.next(self.end)


@step
def end(self):
print("Done")


if __name__ == "__main__":
GPUFlow()
python scale_tuning_gpu.py --environment=conda run
    ...
[560/start/2899 (pid 65768)] Task is starting.
[560/start/2899 (pid 65768)] Downloading https://www.cs.toronto.edu/~kriz/cifar-10-python.tar.gz to ./data/cifar-10-python.tar.gz
100.0%05-24 21:25:42.845 [560/start/2899 (pid 65768)] 0.0%9%4.2%
[560/start/2899 (pid 65768)] Extracting ./data/cifar-10-python.tar.gz to ./data
[560/start/2899 (pid 65768)] Files already downloaded and verified
[560/start/2899 (pid 65768)] Foreach yields 2 child steps.
[560/start/2899 (pid 65768)] Task finished successfully.
...
[560/join/2904 (pid 65886)] Task is starting.
[560/join/2904 (pid 65886)] Best model accuracy was 46%.
[560/join/2904 (pid 65886)] Task finished successfully.
...
[560/end/2905 (pid 65898)] Task is starting.
[560/end/2905 (pid 65898)] Done
[560/end/2905 (pid 65898)] Task finished successfully.
...

Further Reading