Skip to main content

Deploy with Sagemaker

Question

How can I train a model in my flow and deploy it with Sagemaker?

Solution

Sagemaker allows you to host a model on an EC2 instance and gives you an endpoint you can make requests to.

First there are some configuration details you will need to address to deploy a model from your flow.

1Set Sagemaker IAM Role

To use Sagemaker as shown on this page you will need an appropriate IAM Role configured for Sagemaker execution.

AWS has managed policies you can use for this like AmazonSageMakerFullAccess or the DataScientist role.

2Configuration

Next we will configure important environment variables. In this example a .env file is used to manage environment variables for:

  • An IAM role defined for Sagemaker Execution.
  • An S3 prefix URI where Sagemaker can upload your custom code.
my.env
ROLE=<YOUR IAM ROLE>
CODE_LOCATION=<S3 URI TO PUSH YOUR CUSTOM CODE TO>

3Define Sagemaker Entry Point

Here is the file defining the entry point that will be passed to the Sagemaker deployment. This file includes the model_fn function which will load the model when a prediction is requested. There are other functions related to model serving that are not included in this file because the SageMaker scikit-learn model server has a default implementation for those functions.

sagemaker_entry_point.py
import joblib
import os

def model_fn(model_dir):
return joblib.load(os.path.join(model_dir, "model/model.joblib"))

4Define Deployment Logic

The following script contains a light wrapper around the SageMaker SDK that will be called from the flow:

deployer.py
import os
import joblib
import shutil
import tarfile
import numpy as np
from sklearn.base import BaseEstimator
from sagemaker.sklearn import SKLearnModel
from metaflow import S3
from dotenv import load_dotenv
load_dotenv('my.env')

def to_sagemaker(
model:BaseEstimator = None,
sagemaker_model_name:str = "model",
model_save_name:str = "model",
endpoint_name:str = "sklearn endpoint",
instance_type:str = "ml.c5.2xlarge",
entry_point:str = "sagemaker_entry_point.py",
sklearn_version:str = "1.0-1",
role:str = os.getenv('ROLE'),
code_location:str = os.getenv('CODE_LOCATION'),
run = None,
):

# save model to local folder
# this should match what is in sagemaker_entry_point
model_save_name = "model"
os.makedirs(model_save_name, exist_ok=True)
out_path = "{}/{}.joblib".format(model_save_name, model_save_name)
joblib.dump(model, out_path)

# save model as tar.gz
local_tar_name = "{}.tar.gz".format(model_save_name)
with tarfile.open(local_tar_name,
mode="w:gz") as _tar:
_tar.add(model_save_name, recursive=True)

# save model onto S3
with S3(run=run) as s3:
with open(local_tar_name, "rb") as in_file:
data = in_file.read()
model_s3_path = s3.put(local_tar_name, data)

# remove local model folder and tar
shutil.rmtree(model_save_name)
os.remove(local_tar_name)

print("Creating and deploying Sagemaker model...")
sklearn_model = SKLearnModel(
name=sagemaker_model_name,
model_data=model_s3_path,
role=role,
entry_point=entry_point,
framework_version=sklearn_version,
code_location=code_location
)

predictor = sklearn_model.deploy(
instance_type=instance_type,
initial_instance_count=1,
endpoint_name=endpoint_name
)

return model_s3_path

5Run Flow

Here is a flow that shows how to:

  • Train a scikit-learn model.
  • Save the model on S3.
  • Deploy the model with Sagemaker.
  • Verify the model is predicting the same locally and in deployment.
  • (optionally) Clean up the Sagemaker deployment resources.
deploy_to_sagemaker.py
from metaflow import FlowSpec, step, S3, conda_base
import os
import json

class DeployToSagemakerFlow(FlowSpec):

@step
def start(self):
from sklearn import datasets
from sklearn.model_selection import train_test_split
self.iris = datasets.load_iris()
X, y = self.iris['data'], self.iris['target']
self.labels = self.iris['target_names']
split = train_test_split(X, y, test_size=0.2)
self.X_train, self.X_test = split[0], split[1]
self.y_train, self.y_test = split[2], split[3]
self.next(self.train_rf_model)

@step
def train_rf_model(self):
from sklearn.ensemble import RandomForestClassifier
from sklearn.model_selection import cross_val_score
self.clf = RandomForestClassifier(random_state=0)
self.clf.fit(self.X_train, self.y_train)
# will use local preds as test of `deploy` step.
self.local_y_pred = self.clf.predict(self.X_test)
self.next(self.deploy)

@step
def deploy(self):
import time
import deployer
t = int(round(time.time() * 1000))
self.sagemaker_model_name = 'rf-model-{}'.format(t)
self.model_save_name = 'model'
self.endpoint_name = 'rf-endpoint-{}'.format(t)
self.instance_type = 'ml.c5.2xlarge'
self.entry_point = 'sagemaker_entry_point.py'
self.sklearn_sage_version = '1.0-1'
self.model_s3_path = deployer.to_sagemaker(
model = self.clf,
sagemaker_model_name = self.sagemaker_model_name,
model_save_name = self.model_save_name,
endpoint_name = self.endpoint_name,
instance_type = self.instance_type,
entry_point = self.entry_point,
sklearn_version = self.sklearn_sage_version,
role = os.getenv('ROLE'),
code_location=os.getenv('CODE_LOCATION'),
run = self
)
self.next(self.end)

@step
def end(self):
print("\nModel name is: {}".format(
self.sagemaker_model_name))
print("Endpoint name is: {}\n".format(
self.endpoint_name))

if __name__ == "__main__":
DeployToSagemakerFlow()
python deploy_to_sagemaker.py run
Workflow starting (run-id 847):
[847/start/4208 (pid 8886)] Task is starting.
[847/start/4208 (pid 8886)] Task finished successfully.
[847/train_rf_model/4209 (pid 8900)] Task is starting.
[847/train_rf_model/4209 (pid 8900)] Task finished successfully.
[847/deploy/4210 (pid 8923)] Task is starting.
[847/deploy/4210 (pid 8923)] Creating and deploying Sagemaker model...
[847/deploy/4210 (pid 8923)] -----!
[847/deploy/4210 (pid 8923)] Task finished successfully.
[847/end/4211 (pid 8960)] Task is starting.
[847/end/4211 (pid 8960)]
[847/end/4211 (pid 8960)] Model name is: rf-model-1657218689272
[847/end/4211 (pid 8960)] Endpoint name is: rf-endpoint-1657218689272
[847/end/4211 (pid 8960)]
[847/end/4211 (pid 8960)] Task finished successfully.
Done!

You can see your endpoint by going to the Sagemaker section of AWS console and clicking > inference > endpoints. It will take a few minutes for the model and endpoint to create.

6Make Prediction

Now you can make a prediction with your deployed model! You can run the following code in a notebook cell or Python script after executing the flow. The snippet uses data that Metaflow has stored to:

  • Load X_test data from the start step.
  • Send the data to the endpoint to request predictions.
  • Parse the response.
  • Ensure the deployed model predictions are the same as those of the local model version.
from metaflow import Flow
import boto3
import pandas as pd
import numpy as np
import re

sagemaker_runtime = boto3.client("sagemaker-runtime",
region_name='us-east-2')

# load metaflow run data
run_data = Flow('DeployToSagemakerFlow').latest_run.data

# request prediction from model
response = sagemaker_runtime.invoke_endpoint(
EndpointName=run_data.endpoint_name,
Body=pd.DataFrame(run_data.X_test).to_csv(
header=False, index=False),
ContentType="text/csv",
)

# parse response
predictions = [int(re.sub("[^0-9]", "", s))
for s in response["Body"]
.read().decode("utf-8").split(',')]

# ensure deployed model is behaving as expected
assert np.all(run_data.local_y_pred == predictions)

accuracy = sum(run_data.y_test == predictions) / len(predictions)
print("Accuracy: {}".format(round(accuracy, 3)))
Accuracy: 1.0

7Clean up Sagemaker Resources

from sagemaker import Session
from metaflow import Flow

run_data = Flow('DeployToSagemakerFlow').latest_run.data
model_name=run_data.sagemaker_model_name
endpoint_name=run_data.endpoint_name

sagemaker_session = Session()
sm_client = sagemaker_session.boto_session.client("sagemaker")
sm_client.delete_endpoint(EndpointName=endpoint_name)
sm_client.delete_model(ModelName=model_name)
{'ResponseMetadata': {'RequestId': '694abcde-f0e3-4c9d-a199-8946096bdb20',
'HTTPStatusCode': 200,
'HTTPHeaders': {'x-amzn-requestid': '694abcde-f0e3-4c9d-a199-8946096bdb20',
'content-type': 'application/x-amz-json-1.1',
'content-length': '0',
'date': 'Thu, 07 Jul 2022 18:34:30 GMT'},
'RetryAttempts': 0}}

Further Reading

For help or feedback, please join Metaflow Slack. To suggest an article, you may open an issue on GitHub.

Join Slack