Skip to main content

Recommender Systems with Metaflow: Beginner Tutorial

This tutorial on RecSys is the first adventure in training recommender systems using Metaflow to provide a scalable workflow you can use for experimentation and production. The goal is to develop a relatively simple, effective, and general pipeline for sequential recommendations, and show how you can use popular open-source libraries and tools including DuckDB, Gensim, Metaflow, and Keras to build a fully working cloud endpoint that serves predictions in real-time, starting from raw data.

Recommender System with Metaflow

Try it out directly in your browser

Open in Sandbox
# global imports
from metaflow import FlowSpec, step, S3, Parameter, current, card
from metaflow.cards import Markdown, Table
import os
import json
import time
from random import choice

class RecSysSagemakerDeployment(FlowSpec):

IS_DEV = Parameter(
name='is_dev',
help='Flag for dev development, with a smaller dataset',
default='1'
)

KNN_K = Parameter(
name='knn_k',
help='Number of neighbors we retrieve from the vector space',
default='100'
)

# NOTE: Sagemaker-specific parameters below here
# If you don't wish to deploy the model, you can leave 'sagemaker_deploy' as 0,
# and ignore the other parameters. Check the README for more details.
SAGEMAKER_DEPLOY = Parameter(
name='sagemaker_deploy',
help='Deploy KNN model with Sagemaker',
default='0'
)

SAGEMAKER_IMAGE = Parameter(
name='sagemaker_image',
help='Image to use in the Sagemaker endpoint: this is compatible with our TF recs KNN model',
default='763104351884.dkr.ecr.us-west-2.amazonaws.com/tensorflow-inference:2.7.0-gpu-py38-cu112-ubuntu20.04-sagemaker'
)

SAGEMAKER_INSTANCE = Parameter(
name='sagemaker_instance',
help='AWS instance for the Sagemaker endpoint: this may be expensive!',
default='ml.p3.2xlarge'
)

SAGEMAKER_ROLE = Parameter(
name='sagemaker_role',
help='IAM role in AWS to use to spin up the Sagemaker endpoint',
default='MetaSageMakerRole'
)

@step
def start(self):
"""
Start-up: check everything works or fail fast!
"""
from metaflow.metaflow_config import DATASTORE_SYSROOT_S3
print("flow name: %s" % current.flow_name)
print("run id: %s" % current.run_id)
print("username: %s" % current.username)
print("datastore is: %s" % DATASTORE_SYSROOT_S3)
if self.IS_DEV == '1':
print("ATTENTION: RUNNING AS DEV VERSION - DATA WILL BE SUB-SAMPLED!!!")
if self.SAGEMAKER_DEPLOY == '1':
print("ATTENTION: DEPLOYMENT TO SAGEMAKER IS ENABLED!")
assert DATASTORE_SYSROOT_S3 is not None
self.next(self.prepare_dataset)

@step
def prepare_dataset(self):
"""
Get the data in the right shape by reading the parquet dataset
and using duckdb SQL-based wrangling to quickly prepare the datasets for
training our Recommender System.
"""
import duckdb
import numpy as np

con = duckdb.connect(database=':memory:')
con.execute("""
CREATE TABLE playlists AS
SELECT *,
CONCAT (user_id, '-', playlist) as playlist_id,
CONCAT (artist, '|||', track) as track_id,
FROM 'cleaned_spotify_dataset.parquet'
;
""")
con.execute("SELECT * FROM playlists LIMIT 1;")
print(con.fetchone())

tables = ['row_id', 'user_id', 'track_id', 'playlist_id', 'artist']
for t in tables:
con.execute("SELECT COUNT(DISTINCT({})) FROM playlists;".format(t))
print("# of {}".format(t), con.fetchone()[0])

sampling_cmd = ''
if self.IS_DEV == '1':
print("Subsampling data, since this is DEV")
sampling_cmd = ' USING SAMPLE 10 PERCENT (bernoulli)'

dataset_query = """
SELECT * FROM
(
SELECT
playlist_id,
LIST(artist ORDER BY row_id ASC) as artist_sequence,
LIST(track_id ORDER BY row_id ASC) as track_sequence,
array_pop_back(LIST(track_id ORDER BY row_id ASC)) as track_test_x,
LIST(track_id ORDER BY row_id ASC)[-1] as track_test_y
FROM
playlists
GROUP BY playlist_id
HAVING len(track_sequence) > 2
)
{}
;
""".format(sampling_cmd)

con.execute(dataset_query)
df = con.fetch_df()
print("# rows: {}".format(len(df)))
print(df.iloc[0].tolist())
con.close()

train, validate, test = np.split(
df.sample(frac=1, random_state=42),
[int(.7 * len(df)), int(.9 * len(df))])

self.df_dataset = df
self.df_train = train
self.df_validate = validate
self.df_test = test
print("# testing rows: {}".format(len(self.df_test)))

self.hypers_sets = [json.dumps(_) for _ in [
{ 'min_count': 3, 'epochs': 30, 'vector_size': 48, 'window': 10, 'ns_exponent': 0.75 },
{ 'min_count': 5, 'epochs': 30, 'vector_size': 48, 'window': 10, 'ns_exponent': 0.75 },
{ 'min_count': 10, 'epochs': 30, 'vector_size': 48, 'window': 10, 'ns_exponent': 0.75 }
]]
# we train K models in parallel, depending how many configurations of hypers
# we set - we generate K set of vectors, and evaluate them on the validation
# set to pick the best combination of parameters!
self.next(self.generate_embeddings, foreach='hypers_sets')

def predict_next_track(self, vector_space, input_sequence, k):
"""
Given an embedding space, predict best next song with KNN.
Initially, we just take the LAST item in the input playlist as the query item for KNN
and retrieve the top K nearest vectors (you could think of taking the smoothed average embedding
of the input list, for example, as a refinement).

If the query item is not in the vector space, we make a random bet. We could refine this by taking
for example the vector of the artist (average of all songs), or with some other strategy (sampling
by popularity).

For more options on how to generate vectors for "cold items" see for example the paper:
https://dl.acm.org/doi/10.1145/3383313.3411477
"""
query_item = input_sequence[-1]
if query_item not in vector_space:
query_item = choice(list(vector_space.index_to_key))

return [_[0] for _ in vector_space.most_similar(query_item, topn=k)]

def evaluate_model(self, _df, vector_space, k):
lambda_predict = lambda row: self.predict_next_track(vector_space, row['track_test_x'], k)
_df['predictions'] = _df.apply(lambda_predict, axis=1)
lambda_hit = lambda row: 1 if row['track_test_y'] in row['predictions'] else 0
_df['hit'] = _df.apply(lambda_hit, axis=1)
hit_rate = _df['hit'].sum() / len(_df)
return hit_rate

@step
def generate_embeddings(self):
"""
Generate vector representations for songs, based on the Prod2Vec idea.

For an overview of the algorithm and the evaluation, see for example:
https://arxiv.org/abs/2007.14906
"""
from gensim.models.word2vec import Word2Vec
self.hyper_string = self.input
self.hypers = json.loads(self.hyper_string)
track2vec_model = Word2Vec(self.df_train['track_sequence'], **self.hypers)
print("Training with hypers {} is completed!".format(self.hyper_string))
print("Vector space size: {}".format(len(track2vec_model.wv.index_to_key)))
test_track = choice(list(track2vec_model.wv.index_to_key))
print("Example track: '{}'".format(test_track))
test_vector = track2vec_model.wv[test_track]
print("Test vector for '{}': {}".format(test_track, test_vector[:5]))
test_sims = track2vec_model.wv.most_similar(test_track, topn=3)
print("Similar songs to '{}': {}".format(test_track, test_sims))
self.validation_metric = self.evaluate_model(
self.df_validate,
track2vec_model.wv,
k=int(self.KNN_K))
print("Hit Rate@{} is: {}".format(self.KNN_K, self.validation_metric))
self.track_vectors = track2vec_model.wv
self.next(self.join_runs)

@card(type='blank', id='hyperCard')
@step
def join_runs(self, inputs):
"""
Join the parallel runs and merge results into a dictionary.
"""
self.all_vectors = { inp.hyper_string: inp.track_vectors for inp in inputs}
self.all_results = { inp.hyper_string: inp.validation_metric for inp in inputs}
print("Current result map: {}".format(self.all_results))
self.best_model, self_best_result = sorted(self.all_results.items(), key=lambda x: x[1], reverse=True)[0]
print("The best validation score is for model: {}, {}".format(self.best_model, self_best_result))
self.final_vectors = self.all_vectors[self.best_model]
self.final_dataset = inputs[0].df_test
current.card.append(Markdown("## Results from parallel training"))
current.card.append(
Table([
[inp.hyper_string, inp.validation_metric] for inp in inputs
])
)
self.next(self.model_testing)

@step
def model_testing(self):
"""
Test the generalization abilities of the best model by running predictions
on the unseen test data.

We report a quantitative point-wise metric, hit rate @ K, as an initial implementation. However,
evaluating recommender systems is a very complex task, and better metrics, through good abstractions,
are available, i.e. https://reclist.io/.
"""
self.test_metric = self.evaluate_model(
self.final_dataset,
self.final_vectors,
k=int(self.KNN_K))
print("Hit Rate@{} on the test set is: {}".format(self.KNN_K, self.test_metric))
self.next(self.deploy)

def keras_model(
self,
all_ids: list,
song_vectors, # np array with vectors
test_id: str,
test_vector
):
"""
Build a retrieval model using TF recommender abstraction - by packaging the vector space
in a Keras object, we get for free the possibility of shipping the artifact "as is" to
a Sagemaker endpoint, and benefit from the PaaS abstraction and hardware acceleration.

Of course, other deployment options are possible, including for example using a custom script
and a custom image with Sagemaker.
"""
import tensorflow as tf
import tensorflow_recommenders as tfrs
import numpy as np
embedding_dimension = song_vectors[0].shape[0]
print("Vector space dims: {}".format(embedding_dimension))
unknown_vector = np.zeros((1, embedding_dimension))
print(song_vectors.shape, unknown_vector.shape)
embedding_matrix = np.r_[unknown_vector, song_vectors]
print(embedding_matrix.shape)
assert embedding_matrix[0][0] == 0.0
embedding_layer = tf.keras.layers.Embedding(len(all_ids) + 1, embedding_dimension)
embedding_layer.build((None, ))
embedding_layer.set_weights([embedding_matrix])
embedding_layer.trainable = False
vector_model = tf.keras.Sequential([
tf.keras.layers.StringLookup(vocabulary=all_ids, mask_token=None),
embedding_layer
])
_v = vector_model(np.array([test_id]))
# debug
print(test_vector[:3])
print(_v[0][:3])
# test unknonw ID
print("Test unknown id:")
print(vector_model(np.array(['blahdagkagda']))[0][:3])
song_index = tfrs.layers.factorized_top_k.BruteForce(vector_model)
song_index.index(song_vectors, np.array(all_ids))
_, names = song_index(tf.constant([test_id]))
print(f"Recommendations after track '{test_id}': {names[0, :3]}")
return song_index

def build_retrieval_model(self):
"""
Take the embedding space, build a Keras KNN model and store it in S3
so that it can be deployed by a Sagemaker endpoint!

While for simplicity this function is embedded in the deploy step,
you could think of spinning it out as it's own step.
"""
import tarfile
self.model_timestamp = int(round(time.time() * 1000))
model_name = "playlist-recs-model-{}/1".format(self.model_timestamp )
local_tar_name = 'model-{}.tar.gz'.format(self.model_timestamp)
self.test_index = 3
retrieval_model = self.keras_model(
self.all_ids,
self.startup_embeddings,
self.all_ids[self.test_index],
self.startup_embeddings[self.test_index]
)
retrieval_model.save(filepath=model_name)
with tarfile.open(local_tar_name, mode="w:gz") as _tar:
_tar.add(model_name, recursive=True)
with open(local_tar_name, "rb") as in_file:
data = in_file.read()
with S3(run=self) as s3:
url = s3.put(local_tar_name, data)
print("Model saved at: {}".format(url))
return url

@step
def deploy(self):
"""
Inspired by: https://github.com/jacopotagliabue/no-ops-machine-learning/blob/main/flow/training.py

Use SageMaker to deploy the model as a stand-alone, PaaS endpoint, with our choice of the underlying
Docker image and hardware capabilities.

Available images for inferences can be chosen from AWS official list:
https://github.com/aws/deep-learning-containers/blob/master/available_images.md

"""
import numpy as np
self.all_ids = list(self.final_vectors.index_to_key)
self.startup_embeddings = np.array([self.final_vectors[_] for _ in self.all_ids])
if self.SAGEMAKER_DEPLOY == '0':
print("Skipping deployment to Sagemaker")
else:
self.model_s3_path = self.build_retrieval_model()
from sagemaker.tensorflow import TensorFlowModel
self.ENDPOINT_NAME = 'playlist-recs-{}-endpoint'.format(self.model_timestamp)
print("\n\n================\nEndpoint name is: {}\n\n".format(self.ENDPOINT_NAME))
model = TensorFlowModel(
model_data=self.model_s3_path,
image_uri=self.SAGEMAKER_IMAGE,
role=self.SAGEMAKER_ROLE
)
predictor = model.deploy(
initial_instance_count=1,
instance_type=self.SAGEMAKER_INSTANCE,
endpoint_name=self.ENDPOINT_NAME
)
input = {'instances': np.array([self.all_ids[self.test_index]])}
result = predictor.predict(input)
print(input, result)
print("Deleting endpoint now...")
predictor.delete_endpoint()
print("Endpoint deleted!")
self.next(self.end)

@step
def end(self):
pass


if __name__ == '__main__':
RecSysSagemakerDeployment()

If you want to code along, you can open up your sandbox, skip the rest of this setup page, and follow Hugo and Jacopo Tagliabue in the How to Build a Full-Stack Recommender System Live Code Along on YouTube.

Objectives

Our use case is: given a training set made by music playlists (list of songs hand-curated by users), can we suggest what to listen to next when presented with a new song?

By following along you will learn how to:

  • take a recommender system idea from prototype to real-time production;
  • leverage Metaflow to train different versions of the same model and pick the best one;
  • use Metaflow cards to save important details about model performance;
  • package a representation of your data in a Keras object that you can deploy directly from the flow to a cloud endpoint with AWS Sagemaker.

Prerequisites

This tutorial does not assume knowledge about recommender systems, but does assume the following:

  • you are familiar with the basic concepts of Metaflow (flow, steps, tasks, client API, etc.), know how to run a flow;
  • you are familiar with the basic concepts of machine learning such as training, validation, and test split for model evaluation.

If you are new to Metaflow we recommend starting with the introductory tutorials to get up to speed on the basics of Metaflow before returning to this tutorial.

Bonus points (no worries, we will explain all of this) if you:

  • have experience with model serving;
  • know a little bit about what embeddings or neural networks are;
  • are comfortable with AWS concepts such as storing data in S3 and using SageMaker.

Tutorial Structure

The content includes the following:

Each episode contains either a Metaflow script to run or a Jupyter notebook. You do not need access to cloud computing or a Metaflow deployment to complete the first five episodes. If you want to run the final episode that deploys your model to an endpoint, you will need access to an AWS account that has an IAM role that can execute operations on Sagemaker. We will explain these details in that episode. As always, feel free to reach us in the #ask-metaflow channel on Slack if you need help deploying Metaflow on your infrastructure! The estimated time to complete all episodes is 1-2 hours.

Why Metaflow?

The main benefit of using a data science workflow solution like Metaflow when prototyping is that your code will be built on a strong foundation for deploying to a production environment. Metaflow is most useful when projects have scaling requirements, are mission-critical, and/or have many interacting parts. You can read more at these links:

After completing the lessons, you can transfer insights and code from the tutorial to your real-world data science projects. This is a beginner tutorial so it will not reflect many important challenges to consider in production ML environments. For example, in production, you may consider using Metaflow features such as the @conda decorator for dependency management, @batch or @kubernetes for remote execution, and @schedule to automatically trigger jobs.

Recommender System Resources

  • A gentle introduction to RecSys concepts, with a focus on metrics and testing.
  • A research paper on the methodology (prod2vec) we adopted to solve the use case demonstrated in this tutorial.