Skip to main content

Recommender Systems - Episode 6

1How to Deploy our Model behind an Endpoint

The final episode of this tutorial extends our flow once more into a full-fledged, end-to-end workflow. We introduce a few new parameters to govern SageMaker, the AWS service for hosted ML inference. You can follow along with in this flow.

The idea is pretty simple: since now we have a (versioned) artifact that is our tested model, how do we deploy it in real life so that users can get recommendations on what songs should be next in their digital radio?

2Using SageMaker and Metaflow

There are many possible solutions for deploying a KNN model. We pick Sagemaker endpoints here for a few reasons:

  • As an AWS resource, you can spin up and delete the endpoint from Python, directly in your flow - no other configuration is needed!
  • If you're using Metaflow with AWS as a data store (recommended in general, and required by the code below), SageMaker deployments are an elegant way to leverage Metaflow artifact storage: in fact, just point SageMaker to the model on s3!
  • SageMaker is easier to use with one of the pre-defined model types - in this case Tensorflow. In fact, our deployment strategy for the KNN-based model we trained is to first "export" it to a TF-Recs model with keras (the function keras_model), and then deploy it to SageMaker with their TensorFlowModel abstraction. If you wish, you can build your own container and serve predictions directly from the gensim model that we trained - for simplicity, and to showcase another open-source library, we opted here to convert the space to a TF model.

Note that after deployment, getting actual, live predictions in code is as easy as calling result = predictor.predict(input).


You will need access to the appropriate SageMaker execution role to run this code. You can read more about the AmazonSageMakerFullAccess IAM role, as well as more granular permissions in this guide. Once this role is in your IAM policy, we can use it to create SageMaker resources for tasks like training jobs and deploying models to endpoints.


Please consider that:

  • SageMaker allows you to pick a Docker image and hardware. The image we picked is compatible with TensorFlow models. The hardware can be changed but please do so with caution as SageMaker can be very expensive;
  • the code in the following flow automatically deletes the endpoint after making one test prediction - this is to save money. If you wish to test the endpoint for longer, comment out the delete endpoint line and use the same Python code to get predictions from a notebook, for example.

3Deploy your Model from a Flow!

This flow extends episode 4 where we trained several models in parallel. The new steps include

  • The keras_model function helps us package our KNN model using generic TensorFlow abstractions so we can leverage the TensorFlow and Sagemaker integration.
  • The build_retrieval_model calls the keras_model function, zips up the model, and sends it to S3 using Metaflow's built-in S3 client.
  • The deploy step calls the build_retrieval_model function and deploys the resulting model as a Sagemaker endpoint with our choice of the underlying Docker image and hardware capabilities.

# global imports
from metaflow import FlowSpec, step, S3, Parameter, current, card
from import Markdown, Table
import os
import json
import time
from random import choice

class RecSysSagemakerDeployment(FlowSpec):

IS_DEV = Parameter(
help='Flag for dev development, with a smaller dataset',

KNN_K = Parameter(
help='Number of neighbors we retrieve from the vector space',

# NOTE: Sagemaker-specific parameters below here
# If you don't wish to deploy the model, you can leave 'sagemaker_deploy' as 0,
# and ignore the other parameters. Check the README for more details.
help='Deploy KNN model with Sagemaker',

help='Image to use in the Sagemaker endpoint: this is compatible with our TF recs KNN model',

help='AWS instance for the Sagemaker endpoint: this may be expensive!',

help='IAM role in AWS to use to spin up the Sagemaker endpoint',

def start(self):
Start-up: check everything works or fail fast!
from metaflow.metaflow_config import DATASTORE_SYSROOT_S3
print("flow name: %s" % current.flow_name)
print("run id: %s" % current.run_id)
print("username: %s" % current.username)
print("datastore is: %s" % DATASTORE_SYSROOT_S3)
if self.IS_DEV == '1':
if self.SAGEMAKER_DEPLOY == '1':
assert DATASTORE_SYSROOT_S3 is not None

def prepare_dataset(self):
Get the data in the right shape by reading the parquet dataset
and using duckdb SQL-based wrangling to quickly prepare the datasets for
training our Recommender System.
import duckdb
import numpy as np

con = duckdb.connect(database=':memory:')
CONCAT (user_id, '-', playlist) as playlist_id,
CONCAT (artist, '|||', track) as track_id,
FROM 'cleaned_spotify_dataset.parquet'
con.execute("SELECT * FROM playlists LIMIT 1;")

tables = ['row_id', 'user_id', 'track_id', 'playlist_id', 'artist']
for t in tables:
con.execute("SELECT COUNT(DISTINCT({})) FROM playlists;".format(t))
print("# of {}".format(t), con.fetchone()[0])

sampling_cmd = ''
if self.IS_DEV == '1':
print("Subsampling data, since this is DEV")
sampling_cmd = ' USING SAMPLE 10 PERCENT (bernoulli)'

dataset_query = """
LIST(artist ORDER BY row_id ASC) as artist_sequence,
LIST(track_id ORDER BY row_id ASC) as track_sequence,
array_pop_back(LIST(track_id ORDER BY row_id ASC)) as track_test_x,
LIST(track_id ORDER BY row_id ASC)[-1] as track_test_y
GROUP BY playlist_id
HAVING len(track_sequence) > 2

df = con.fetch_df()
print("# rows: {}".format(len(df)))

train, validate, test = np.split(
df.sample(frac=1, random_state=42),
[int(.7 * len(df)), int(.9 * len(df))])

self.df_dataset = df
self.df_train = train
self.df_validate = validate
self.df_test = test
print("# testing rows: {}".format(len(self.df_test)))

self.hypers_sets = [json.dumps(_) for _ in [
{ 'min_count': 3, 'epochs': 30, 'vector_size': 48, 'window': 10, 'ns_exponent': 0.75 },
{ 'min_count': 5, 'epochs': 30, 'vector_size': 48, 'window': 10, 'ns_exponent': 0.75 },
{ 'min_count': 10, 'epochs': 30, 'vector_size': 48, 'window': 10, 'ns_exponent': 0.75 }
# we train K models in parallel, depending how many configurations of hypers
# we set - we generate K set of vectors, and evaluate them on the validation
# set to pick the best combination of parameters!, foreach='hypers_sets')

def predict_next_track(self, vector_space, input_sequence, k):
Given an embedding space, predict best next song with KNN.
Initially, we just take the LAST item in the input playlist as the query item for KNN
and retrieve the top K nearest vectors (you could think of taking the smoothed average embedding
of the input list, for example, as a refinement).

If the query item is not in the vector space, we make a random bet. We could refine this by taking
for example the vector of the artist (average of all songs), or with some other strategy (sampling
by popularity).

For more options on how to generate vectors for "cold items" see for example the paper:
query_item = input_sequence[-1]
if query_item not in vector_space:
query_item = choice(list(vector_space.index_to_key))

return [_[0] for _ in vector_space.most_similar(query_item, topn=k)]

def evaluate_model(self, _df, vector_space, k):
lambda_predict = lambda row: self.predict_next_track(vector_space, row['track_test_x'], k)
_df['predictions'] = _df.apply(lambda_predict, axis=1)
lambda_hit = lambda row: 1 if row['track_test_y'] in row['predictions'] else 0
_df['hit'] = _df.apply(lambda_hit, axis=1)
hit_rate = _df['hit'].sum() / len(_df)
return hit_rate

def generate_embeddings(self):
Generate vector representations for songs, based on the Prod2Vec idea.

For an overview of the algorithm and the evaluation, see for example:
from gensim.models.word2vec import Word2Vec
self.hyper_string = self.input
self.hypers = json.loads(self.hyper_string)
track2vec_model = Word2Vec(self.df_train['track_sequence'], **self.hypers)
print("Training with hypers {} is completed!".format(self.hyper_string))
print("Vector space size: {}".format(len(track2vec_model.wv.index_to_key)))
test_track = choice(list(track2vec_model.wv.index_to_key))
print("Example track: '{}'".format(test_track))
test_vector = track2vec_model.wv[test_track]
print("Test vector for '{}': {}".format(test_track, test_vector[:5]))
test_sims = track2vec_model.wv.most_similar(test_track, topn=3)
print("Similar songs to '{}': {}".format(test_track, test_sims))
self.validation_metric = self.evaluate_model(
print("Hit Rate@{} is: {}".format(self.KNN_K, self.validation_metric))
self.track_vectors = track2vec_model.wv

@card(type='blank', id='hyperCard')
def join_runs(self, inputs):
Join the parallel runs and merge results into a dictionary.
self.all_vectors = { inp.hyper_string: inp.track_vectors for inp in inputs}
self.all_results = { inp.hyper_string: inp.validation_metric for inp in inputs}
print("Current result map: {}".format(self.all_results))
self.best_model, self_best_result = sorted(self.all_results.items(), key=lambda x: x[1], reverse=True)[0]
print("The best validation score is for model: {}, {}".format(self.best_model, self_best_result))
self.final_vectors = self.all_vectors[self.best_model]
self.final_dataset = inputs[0].df_test
current.card.append(Markdown("## Results from parallel training"))
[inp.hyper_string, inp.validation_metric] for inp in inputs

def model_testing(self):
Test the generalization abilities of the best model by running predictions
on the unseen test data.

We report a quantitative point-wise metric, hit rate @ K, as an initial implementation. However,
evaluating recommender systems is a very complex task, and better metrics, through good abstractions,
are available, i.e.
self.test_metric = self.evaluate_model(
print("Hit Rate@{} on the test set is: {}".format(self.KNN_K, self.test_metric))

def keras_model(
all_ids: list,
song_vectors, # np array with vectors
test_id: str,
Build a retrieval model using TF recommender abstraction - by packaging the vector space
in a Keras object, we get for free the possibility of shipping the artifact "as is" to
a Sagemaker endpoint, and benefit from the PaaS abstraction and hardware acceleration.

Of course, other deployment options are possible, including for example using a custom script
and a custom image with Sagemaker.
import tensorflow as tf
import tensorflow_recommenders as tfrs
import numpy as np
embedding_dimension = song_vectors[0].shape[0]
print("Vector space dims: {}".format(embedding_dimension))
unknown_vector = np.zeros((1, embedding_dimension))
print(song_vectors.shape, unknown_vector.shape)
embedding_matrix = np.r_[unknown_vector, song_vectors]
assert embedding_matrix[0][0] == 0.0
embedding_layer = tf.keras.layers.Embedding(len(all_ids) + 1, embedding_dimension), ))
embedding_layer.trainable = False
vector_model = tf.keras.Sequential([
tf.keras.layers.StringLookup(vocabulary=all_ids, mask_token=None),
_v = vector_model(np.array([test_id]))
# debug
# test unknonw ID
print("Test unknown id:")
song_index = tfrs.layers.factorized_top_k.BruteForce(vector_model)
song_index.index(song_vectors, np.array(all_ids))
_, names = song_index(tf.constant([test_id]))
print(f"Recommendations after track '{test_id}': {names[0, :3]}")
return song_index

def build_retrieval_model(self):
Take the embedding space, build a Keras KNN model and store it in S3
so that it can be deployed by a Sagemaker endpoint!

While for simplicity this function is embedded in the deploy step,
you could think of spinning it out as it's own step.
import tarfile
self.model_timestamp = int(round(time.time() * 1000))
model_name = "playlist-recs-model-{}/1".format(self.model_timestamp )
local_tar_name = 'model-{}.tar.gz'.format(self.model_timestamp)
self.test_index = 3
retrieval_model = self.keras_model(
with, mode="w:gz") as _tar:
_tar.add(model_name, recursive=True)
with open(local_tar_name, "rb") as in_file:
data =
with S3(run=self) as s3:
url = s3.put(local_tar_name, data)
print("Model saved at: {}".format(url))
return url

def deploy(self):
Inspired by:

Use SageMaker to deploy the model as a stand-alone, PaaS endpoint, with our choice of the underlying
Docker image and hardware capabilities.

Available images for inferences can be chosen from AWS official list:

import numpy as np
self.all_ids = list(self.final_vectors.index_to_key)
self.startup_embeddings = np.array([self.final_vectors[_] for _ in self.all_ids])
if self.SAGEMAKER_DEPLOY == '0':
print("Skipping deployment to Sagemaker")
self.model_s3_path = self.build_retrieval_model()
from sagemaker.tensorflow import TensorFlowModel
self.ENDPOINT_NAME = 'playlist-recs-{}-endpoint'.format(self.model_timestamp)
print("\n\n================\nEndpoint name is: {}\n\n".format(self.ENDPOINT_NAME))
model = TensorFlowModel(
predictor = model.deploy(
input = {'instances': np.array([self.all_ids[self.test_index]])}
result = predictor.predict(input)
print(input, result)
print("Deleting endpoint now...")
print("Endpoint deleted!")

def end(self):

if __name__ == '__main__':

4Run Your Flow

To run this flow and deploy to Sagemaker, you will need:

  • Access to a Metaflow deployment on AWS. Reach us on Slack if you need help getting set up!
  • Your active Metaflow profile to be configured with an S3 as the DATASTORE_SYSROOT_S3 variable. You can find the default config at $HOME/.metaflowconfig/config.json. Read more here.
  • To set the argument --sagemaker_deploy 1.
  • To set the argument --sagemaker_role <YOUR SAGEMAKER EXECUTION ROLE>.
python run --sagemaker_deploy 1 --sagemaker_role <SAGEMAKER_ROLE>
     Workflow starting (run-id 187483):
[187483/start/1012249 (pid 74483)] Task is starting.
[187483/start/1012249 (pid 74483)] flow name: RecSysSagemakerDeployment
[187483/start/1012249 (pid 74483)] run id: 187483
[187483/start/1012249 (pid 74483)] username: eddie
[187483/start/1012249 (pid 74483)] ATTENTION: RUNNING AS DEV VERSION - DATA WILL BE SUB-SAMPLED!!!
[187483/start/1012249 (pid 74483)] ATTENTION: DEPLOYMENT TO SAGEMAKER IS ENABLED!
[187483/start/1012249 (pid 74483)] Task finished successfully.
[187483/prepare_dataset/1012250 (pid 74487)] Task is starting.
[187483/prepare_dataset/1012250 (pid 74487)] (0, '9cc0cfd4d7d7885102480dd99e7a90d6', 'Elvis Costello', '(The Angels Wanna Wear My) Red Shoes', 'HARD ROCK 2010', '9cc0cfd4d7d7885102480dd99e7a90d6-HARD ROCK 2010', 'Elvis Costello|||(The Angels Wanna Wear My) Red Shoes')
[187483/prepare_dataset/1012250 (pid 74487)] # of row_id 12891680
[187483/prepare_dataset/1012250 (pid 74487)] # of user_id 15918
[187483/prepare_dataset/1012250 (pid 74487)] # of track_id 2819059
[187483/prepare_dataset/1012250 (pid 74487)] # of playlist_id 232369
[187483/prepare_dataset/1012250 (pid 74487)] # of artist 289821
[187483/prepare_dataset/1012250 (pid 74487)] Subsampling data, since this is DEV
[187483/prepare_dataset/1012250 (pid 74487)] # rows: 21521
[187483/prepare_dataset/1012250 (pid 74487)] ['59f776b2f9de1de9969bc43bcd3fca59-Rush', ['Iron Maiden', 'Rush', 'Rush', 'Judas Priest'], ['Iron Maiden|||2 Minutes To Midnight', 'Rush|||Limelight', 'Rush|||Tom Sawyer', "Judas Priest|||You've Got Another Thing Comin'"], ['Iron Maiden|||2 Minutes To Midnight', 'Rush|||Limelight', 'Rush|||Tom Sawyer'], "Judas Priest|||You've Got Another Thing Comin'"]
[187483/prepare_dataset/1012250 (pid 74487)] # testing rows: 2153
[187483/prepare_dataset/1012250 (pid 74487)] Foreach yields 3 child steps.
[187483/prepare_dataset/1012250 (pid 74487)] Task finished successfully.
[187483/generate_embeddings/1012251 (pid 74518)] Task is starting.
[187483/generate_embeddings/1012252 (pid 74521)] Task is starting.
[187483/generate_embeddings/1012253 (pid 74524)] Task is starting.
[187483/generate_embeddings/1012253 (pid 74524)] Training with hypers {"min_count": 10, "epochs": 30, "vector_size": 48, "window": 10, "ns_exponent": 0.75} is completed!
[187483/generate_embeddings/1012253 (pid 74524)] Vector space size: 9379
[187483/generate_embeddings/1012253 (pid 74524)] Example track: 'Allah-Las|||Catamaran'
[187483/generate_embeddings/1012253 (pid 74524)] Test vector for 'Allah-Las|||Catamaran': [-0.5164605 -0.39848462 0.48366475 0.06304073 0.16176549]
[187483/generate_embeddings/1012253 (pid 74524)] Similar songs to 'Allah-Las|||Catamaran': [('Kimbra|||Cameo Lover', 0.965700626373291), ('Augustines|||Chapel Song', 0.9608513712882996), ('My Morning Jacket|||Circuital', 0.960833728313446)]
[187483/generate_embeddings/1012253 (pid 74524)] Hit Rate@100 is: 0.06342936802973978
[187483/generate_embeddings/1012251 (pid 74518)] Training with hypers {"min_count": 3, "epochs": 30, "vector_size": 48, "window": 10, "ns_exponent": 0.75} is completed!
[187483/generate_embeddings/1012251 (pid 74518)] Vector space size: 62838
[187483/generate_embeddings/1012251 (pid 74518)] Example track: 'Queen|||Crazy Little Thing Called Love - 2011 Remaster'
[187483/generate_embeddings/1012251 (pid 74518)] Test vector for 'Queen|||Crazy Little Thing Called Love - 2011 Remaster': [-0.40840358 -0.28218955 -0.67459786 0.5966493 0.71324164]
[187483/generate_embeddings/1012251 (pid 74518)] Similar songs to 'Queen|||Crazy Little Thing Called Love - 2011 Remaster': [('Brandon Flowers|||Crossfire', 0.9929628372192383), ('Styx|||Crystal Ball', 0.9924074411392212), ('Madonna|||Crazy for You', 0.9904692769050598)]
[187483/generate_embeddings/1012252 (pid 74521)] Training with hypers {"min_count": 5, "epochs": 30, "vector_size": 48, "window": 10, "ns_exponent": 0.75} is completed!
[187483/generate_embeddings/1012252 (pid 74521)] Vector space size: 28303
[187483/generate_embeddings/1012252 (pid 74521)] Example track: 'Bone Thugs-N-Harmony|||1st Of Tha Month'
[187483/generate_embeddings/1012252 (pid 74521)] Test vector for 'Bone Thugs-N-Harmony|||1st Of Tha Month': [-0.03574348 -0.05404772 0.24944994 -0.16587584 0.324895 ]
[187483/generate_embeddings/1012252 (pid 74521)] Similar songs to 'Bone Thugs-N-Harmony|||1st Of Tha Month': [('Ana Tijoux|||1977', 0.885529637336731), ('Hamilton Leithauser|||Alexandra', 0.8741297125816345), ('Ja Rule|||Always On Time', 0.8725541234016418)]
[187483/generate_embeddings/1012251 (pid 74518)] Hit Rate@100 is: 0.07969330855018587
[187483/generate_embeddings/1012252 (pid 74521)] Hit Rate@100 is: 0.08921933085501858
[187483/generate_embeddings/1012253 (pid 74524)] Task finished successfully.
[187483/generate_embeddings/1012251 (pid 74518)] Task finished successfully.
[187483/generate_embeddings/1012252 (pid 74521)] Task finished successfully.
[187483/join_runs/1012254 (pid 74569)] Task is starting.
[187483/join_runs/1012254 (pid 74569)] Current result map: {'{"min_count": 3, "epochs": 30, "vector_size": 48, "window": 10, "ns_exponent": 0.75}': 0.07969330855018587, '{"min_count": 5, "epochs": 30, "vector_size": 48, "window": 10, "ns_exponent": 0.75}': 0.08921933085501858, '{"min_count": 10, "epochs": 30, "vector_size": 48, "window": 10, "ns_exponent": 0.75}': 0.06342936802973978}
[187483/join_runs/1012254 (pid 74569)] The best validation score is for model: {"min_count": 5, "epochs": 30, "vector_size": 48, "window": 10, "ns_exponent": 0.75}, 0.08921933085501858
[187483/join_runs/1012254 (pid 74569)] Task finished successfully.
[187483/model_testing/1012255 (pid 74580)] Task is starting.
[187483/model_testing/1012255 (pid 74580)] Hit Rate@100 on the test set is: 0.08360427310729215
[187483/model_testing/1012255 (pid 74580)] Task finished successfully.
[187483/deploy/1012256 (pid 74588)] Task is starting.
[187483/deploy/1012256 (pid 74588)] Vector space dims: 48
[187483/deploy/1012256 (pid 74588)] (28303, 48) (1, 48)
[187483/deploy/1012256 (pid 74588)] (28304, 48)
[187483/deploy/1012256 (pid 74588)] [-0.44227722 0.62302357 2.6069138 ]
[187483/deploy/1012256 (pid 74588)] tf.Tensor([-0.44227722 0.62302357 2.6069138 ], shape=(3,), dtype=float32)
[187483/deploy/1012256 (pid 74588)] Test unknown id:
[187483/deploy/1012256 (pid 74588)] tf.Tensor([0. 0. 0.], shape=(3,), dtype=float32)
[187483/deploy/1012256 (pid 74588)] Recommendations after track 'Daft Punk|||Get Lucky - Radio Edit': [b'Daft Punk|||Get Lucky - Radio Edit' b'Daft Punk|||Get Lucky'
[187483/deploy/1012256 (pid 74588)] WARNING:absl:Found untraced functions such as query_with_exclusions while saving (showing 1 of 1). These functions will not be directly callable after loading.
[187483/deploy/1012256 (pid 74588)] b'PSY|||Gangnam Style (\xea\xb0\x95\xeb\x82\xa8\xec\x8a\xa4\xed\x83\x80\xec\x9d\xbc)']
[187483/deploy/1012256 (pid 74588)]
[187483/deploy/1012256 (pid 74588)] WARNING:sagemaker.deprecations:update_endpoint is a no-op in sagemaker>=2.
[187483/deploy/1012256 (pid 74588)]
[187483/deploy/1012256 (pid 74588)] ================
[187483/deploy/1012256 (pid 74588)] Endpoint name is: playlist-recs-1669682006179-endpoint
[187483/deploy/1012256 (pid 74588)]
[187483/deploy/1012256 (pid 74588)]
[187483/deploy/1012256 (pid 74588)] ------------!{'instances': array(['Daft Punk|||Get Lucky - Radio Edit'], dtype='<U34')} {'predictions': [{'output_1': [327.358551, 309.735748, 239.649109, 236.526245, 230.304749, 227.55574, 226.75943, 226.298782, 225.014877, 221.921616], 'output_2': ['Daft Punk|||Get Lucky - Radio Edit', 'Daft Punk|||Get Lucky', 'PSY|||Gangnam Style (강남스타일)', 'Avicii|||Hey Brother', 'Pharrell Williams|||Happy - From Despicable Me 2""', 'Pharrell Williams|||Happy', 'Katy Perry|||Firework', 'Daft Punk|||Giorgio by Moroder', "Swedish House Mafia|||Don't You Worry Child (Radio Edit) [feat. John Martin]", 'Swedish House Mafia|||Greyhound']}]}
1 task is running: deploy (1 running; 0 done).
No tasks are waiting in the queue.
end step has not started
[187483/deploy/1012256 (pid 74588)] Deleting endpoint now...
[187483/deploy/1012256 (pid 74588)] Endpoint deleted!
[187483/deploy/1012256 (pid 74588)] See: for details.
[187483/deploy/1012256 (pid 74588)] Task finished successfully.
[187483/end/1012257 (pid 74676)] Task is starting.
[187483/end/1012257 (pid 74676)] All done
[187483/end/1012257 (pid 74676)]
[187483/end/1012257 (pid 74676)] See you, space cowboy
[187483/end/1012257 (pid 74676)]
[187483/end/1012257 (pid 74676)] Task finished successfully.


Congratulations, you have completed Metaflow's introductory tutorial on recommender system workflows! You have learned how to:

  • take a recommender system idea from prototype to real-time production;
  • leverage Metaflow to train different versions of the same model and pick the best one;
  • use Metaflow cards to save important details about model performance;
  • package a representation of your data in a keras object that you can deploy directly from the flow to a cloud endpoint with AWS Sagemaker.

To keep progressing in your Metaflow journey you can:

  • Check out the open-source repository.
  • Join our Slack community and learn with us in #ask-metaflow.
    • We are actively working on more advanced recommender system tutorials. Please send us your suggestions and questions!