Skip to main content

Beginner Recommender Systems: Episode 4

1Seamless parallelism with Metaflow

Once you have a functioning process for embedding features and a model that uses them for predictions, we can leverage Metaflow's built-in capabilities for parallelization to take this workflow to the next level. With a slight modification of our flow (note the foreach='hypers_sets' parameter), we can scale tasks that can be parallelized on as many machines as your cloud budget allows.

You can follow along with these modifications in the flow code for this episode. The flow includes tasks like processing data, or in this case hyperparameter tuning. The same logic applies as the flow in the previous episode, but now we tune over many embedding spaces in parallel and pick the best one (on the validation set) to use as our candidate model. As before, we then test our candidate model once again on the held-out set, to give us a sense of its generalization abilities.

2Organize Metaflow run results with cards

We can use Metaflow card abstractions to create cards to document specific components of the pipeline in a versioned, shareable format. For example, @card(type='blank', id='hyperCard') records the performance for all the models we trained. If you wish to use a separate tool for tracking experiments, you can leverage Metaflow integrations with tools like Comet ML and Weights and Biases.

3Find better models with a hyperparemeter tuning flow

In the following code you will see the RecSysTuningFlow. This flow is nearly identical to the previous one. The changes include:

  • Metaflow's foreach pattern at the end of the prepare_dataset step. The line self.next(self.generate_embeddings, foreach='hypers_sets') indicates that the generate_embeddings step will be created for all the combinations of parameters defined in the self.hypers_sets variable.
  • The join_runs step is required (it doesn't have to be any particular name, but the "join" step concept needs to exist) to merge all of the artifacts produced in the parallel generate_embeddings steps. Notice this function receives the inputs argument, containing results for each of the embeddings evaluated. This step includes the card that organizes the results of the parameter combinations and the validation metric scores they produced.

recsys_tuning_flow.py
from metaflow import FlowSpec, step, S3, Parameter, current, card
from metaflow.cards import Markdown, Table
import os
import json
import time
from random import choice


class RecSysTuningFlow(FlowSpec):

IS_DEV = Parameter(
name='is_dev',
help='Flag for dev development, with a smaller dataset',
default='1'
)

KNN_K = Parameter(
name='knn_k',
help='Number of neighbors we retrieve from the vector space',
default='100'
)

@step
def start(self):
print("flow name: %s" % current.flow_name)
print("run id: %s" % current.run_id)
print("username: %s" % current.username)
if self.IS_DEV == '1':
print("ATTENTION: RUNNING AS DEV VERSION - DATA WILL BE SUB-SAMPLED!!!")
self.next(self.prepare_dataset)

@step
def prepare_dataset(self):
"""
Get the data in the right shape by reading the parquet dataset
and using duckdb SQL-based wrangling to quickly prepare the datasets for
training our Recommender System.
"""
import duckdb
import numpy as np
con = duckdb.connect(database=':memory:')

con.execute("""
CREATE TABLE playlists AS
SELECT *,
CONCAT (user_id, '-', playlist) as playlist_id,
CONCAT (artist, '|||', track) as track_id,
FROM 'cleaned_spotify_dataset.parquet'
;
""")
con.execute("SELECT * FROM playlists LIMIT 1;")
print(con.fetchone())

tables = ['row_id', 'user_id', 'track_id', 'playlist_id', 'artist']
for t in tables:
con.execute("SELECT COUNT(DISTINCT({})) FROM playlists;".format(t))
print("# of {}".format(t), con.fetchone()[0])

sampling_cmd = ''
if self.IS_DEV == '1':
print("Subsampling data, since this is DEV")
sampling_cmd = ' USING SAMPLE 1 PERCENT (bernoulli)'

dataset_query = """
SELECT * FROM
(
SELECT
playlist_id,
LIST(artist ORDER BY row_id ASC) as artist_sequence,
LIST(track_id ORDER BY row_id ASC) as track_sequence,
array_pop_back(LIST(track_id ORDER BY row_id ASC)) as track_test_x,
LIST(track_id ORDER BY row_id ASC)[-1] as track_test_y
FROM
playlists
GROUP BY playlist_id
HAVING len(track_sequence) > 2
)
{}
;
""".format(sampling_cmd)

con.execute(dataset_query)
df = con.fetch_df()
print("# rows: {}".format(len(df)))
print(df.iloc[0].tolist())
con.close()

train, validate, test = np.split(
df.sample(frac=1, random_state=42),
[int(.7 * len(df)), int(.9 * len(df))])

self.df_dataset = df
self.df_train = train
self.df_validate = validate
self.df_test = test
print("# testing rows: {}".format(len(self.df_test)))

self.hypers_sets = [json.dumps(_) for _ in [
{ 'min_count': 5, 'epochs': 30, 'vector_size': 48, 'window': 10, 'ns_exponent': 0.75 },
{ 'min_count': 10, 'epochs': 30, 'vector_size': 48, 'window': 10, 'ns_exponent': 0.75 }
]]
# we train K models in parallel, depending how many configurations of hypers
# we set - we generate K set of vectors, and evaluate them on the validation
# set to pick the best combination of parameters!
self.next(self.generate_embeddings, foreach='hypers_sets')

def predict_next_track(self, vector_space, input_sequence, k):
"""
Given an embedding space, predict best next song with KNN.
Initially, we just take the LAST item in the input playlist as the query item for KNN
and retrieve the top K nearest vectors (you could think of taking the smoothed average embedding
of the input list, for example, as a refinement).

If the query item is not in the vector space, we make a random bet. We could refine this by taking
for example the vector of the artist (average of all songs), or with some other strategy (sampling
by popularity).

For more options on how to generate vectors for "cold items" see for example the paper:
https://dl.acm.org/doi/10.1145/3383313.3411477
"""
query_item = input_sequence[-1]
if query_item not in vector_space:
query_item = choice(list(vector_space.index_to_key))

return [_[0] for _ in vector_space.most_similar(query_item, topn=k)]

def evaluate_model(self, _df, vector_space, k):
lambda_predict = lambda row: self.predict_next_track(vector_space, row['track_test_x'], k)
_df['predictions'] = _df.apply(lambda_predict, axis=1)
lambda_hit = lambda row: 1 if row['track_test_y'] in row['predictions'] else 0
_df['hit'] = _df.apply(lambda_hit, axis=1)
hit_rate = _df['hit'].sum() / len(_df)
return hit_rate

@step
def generate_embeddings(self):
"""
Generate vector representations for songs, based on the Prod2Vec idea.

For an overview of the algorithm and the evaluation, see for example:
https://arxiv.org/abs/2007.14906
"""
from gensim.models.word2vec import Word2Vec
self.hyper_string = self.input
self.hypers = json.loads(self.hyper_string)
track2vec_model = Word2Vec(self.df_train['track_sequence'], **self.hypers)
print("Training with hypers {} is completed!".format(self.hyper_string))
print("Vector space size: {}".format(len(track2vec_model.wv.index_to_key)))
test_track = choice(list(track2vec_model.wv.index_to_key))
print("Example track: '{}'".format(test_track))
test_vector = track2vec_model.wv[test_track]
print("Test vector for '{}': {}".format(test_track, test_vector[:5]))
test_sims = track2vec_model.wv.most_similar(test_track, topn=3)
print("Similar songs to '{}': {}".format(test_track, test_sims))
self.validation_metric = self.evaluate_model(
self.df_validate,
track2vec_model.wv,
k=int(self.KNN_K))
print("Hit Rate@{} is: {}".format(self.KNN_K, self.validation_metric))
self.track_vectors = track2vec_model.wv
self.next(self.join_runs)

@card(type='blank', id='hyperCard')
@step
def join_runs(self, inputs):
"""
Join the parallel runs and merge results into a dictionary.
"""
self.all_vectors = { inp.hyper_string: inp.track_vectors for inp in inputs}
self.all_results = { inp.hyper_string: inp.validation_metric for inp in inputs}
print("Current result map: {}".format(self.all_results))
self.best_model, self_best_result = sorted(self.all_results.items(), key=lambda x: x[1], reverse=True)[0]
print("The best validation score is for model: {}, {}".format(self.best_model, self_best_result))
self.final_vectors = self.all_vectors[self.best_model]
self.final_dataset = inputs[0].df_test
current.card.append(Markdown("## Results from parallel training"))
current.card.append(
Table([
[inp.hyper_string, inp.validation_metric] for inp in inputs
])
)
# next, test the best model on unseen data, and report the final Hit Rate as
# our best point-wise estimate of "in the wild" performance
self.next(self.model_testing)

@step
def model_testing(self):
"""
Test the generalization abilities of the best model by running predictions
on the unseen test data.

We report a quantitative point-wise metric, hit rate @ K, as an initial implementation. However,
evaluating recommender systems is a very complex task, and better metrics, through good abstractions,
are available, i.e. https://reclist.io/.
"""
self.test_metric = self.evaluate_model(
self.final_dataset,
self.final_vectors,
k=int(self.KNN_K))
print("Hit Rate@{} on the test set is: {}".format(self.KNN_K, self.test_metric))
self.next(self.end)

@step
def end(self):
"""
Just say bye!
"""
print("All done\n\nSee you, space cowboy\n")
return


if __name__ == '__main__':
RecSysTuningFlow()

3Run your flow

python recsys_tuning_flow.py run --is_dev 0
     Workflow starting (run-id 188134):
[188134/start/1014307 (pid 87579)] Task is starting.
[188134/start/1014307 (pid 87579)] flow name: RecSysTuningFlow
[188134/start/1014307 (pid 87579)] run id: 188134
[188134/start/1014307 (pid 87579)] username: eddie
[188134/start/1014307 (pid 87579)] Task finished successfully.
[188134/prepare_dataset/1014308 (pid 87583)] Task is starting.
[188134/prepare_dataset/1014308 (pid 87583)] (0, '9cc0cfd4d7d7885102480dd99e7a90d6', 'Elvis Costello', '(The Angels Wanna Wear My) Red Shoes', 'HARD ROCK 2010', '9cc0cfd4d7d7885102480dd99e7a90d6-HARD ROCK 2010', 'Elvis Costello|||(The Angels Wanna Wear My) Red Shoes')
[188134/prepare_dataset/1014308 (pid 87583)] # of row_id 12891680
[188134/prepare_dataset/1014308 (pid 87583)] # of user_id 15918
[188134/prepare_dataset/1014308 (pid 87583)] # of track_id 2819059
[188134/prepare_dataset/1014308 (pid 87583)] # of playlist_id 232369
[188134/prepare_dataset/1014308 (pid 87583)] # of artist 289821
[188134/prepare_dataset/1014308 (pid 87583)] # rows: 217843
[188134/prepare_dataset/1014308 (pid 87583)] ['b741ad9438bcf049085e58aa184a4be1-EA Sports - MVP Baseball 2005 Soundtrack!', ['The Bravery', 'Louis XIV', 'Rock n Roll Soldiers', '...And You Will Know Us By The Trail Of Dead', 'Dropkick Murphys', 'The High Speed Scene', 'Steriogram', 'Hot Hot Heat'], ['The Bravery|||An Honest Mistake', 'Louis XIV|||Finding Out True Love Is Blind - Album/EP Version', 'Rock n Roll Soldiers|||Funny Little Feeling', '...And You Will Know Us By The Trail Of Dead|||Let It Dive', 'Dropkick Murphys|||Tessie (Radio Version)', 'The High Speed Scene|||The I Roc Z Song', 'Steriogram|||Walkie Talkie Man', 'Hot Hot Heat|||You Owe Me An IOU'], ['The Bravery|||An Honest Mistake', 'Louis XIV|||Finding Out True Love Is Blind - Album/EP Version', 'Rock n Roll Soldiers|||Funny Little Feeling', '...And You Will Know Us By The Trail Of Dead|||Let It Dive', 'Dropkick Murphys|||Tessie (Radio Version)', 'The High Speed Scene|||The I Roc Z Song', 'Steriogram|||Walkie Talkie Man'], 'Hot Hot Heat|||You Owe Me An IOU']
[188134/prepare_dataset/1014308 (pid 87583)] # testing rows: 21785
[188134/prepare_dataset/1014308 (pid 87583)] Foreach yields 2 child steps.
[188134/prepare_dataset/1014308 (pid 87583)] Task finished successfully.
[188134/generate_embeddings/1014309 (pid 87633)] Task is starting.
[188134/generate_embeddings/1014310 (pid 87636)] Task is starting.
[188134/generate_embeddings/1014310 (pid 87636)] Training with hypers {"min_count": 10, "epochs": 30, "vector_size": 48, "window": 10, "ns_exponent": 0.75} is completed!
[188134/generate_embeddings/1014310 (pid 87636)] Vector space size: 148016
[188134/generate_embeddings/1014310 (pid 87636)] Example track: 'Buddy Holly|||Rock Around With Ollie Vee'
[188134/generate_embeddings/1014310 (pid 87636)] Test vector for 'Buddy Holly|||Rock Around With Ollie Vee': [-0.57586724 0.25871253 1.0829923 -0.40932345 1.0014604 ]
[188134/generate_embeddings/1014310 (pid 87636)] Similar songs to 'Buddy Holly|||Rock Around With Ollie Vee': [('Duane Eddy|||Rebel Rouser', 0.9402039051055908), ('Carl Perkins|||Put Your Cat Clothes On', 0.936795175075531), ('Buddy Holly|||Ready Teddy', 0.9180619716644287)]
[188134/generate_embeddings/1014309 (pid 87633)] Training with hypers {"min_count": 5, "epochs": 30, "vector_size": 48, "window": 10, "ns_exponent": 0.75} is completed!
2 tasks are running: generate_embeddings (2 running; 0 done).
No tasks are waiting in the queue.
3 steps have not started: join_runs, end, model_testing.
[188134/generate_embeddings/1014309 (pid 87633)] Vector space size: 316725
[188134/generate_embeddings/1014309 (pid 87633)] Example track: 'Bananarama|||Long Train Running [7-inch version]'
[188134/generate_embeddings/1014309 (pid 87633)] Test vector for 'Bananarama|||Long Train Running [7-inch version]': [-0.11822549 -0.00500495 0.5621464 0.2686343 0.64628214]
[188134/generate_embeddings/1014309 (pid 87633)] Similar songs to 'Bananarama|||Long Train Running [7-inch version]': [('Bananarama|||Long Train Running', 0.9727321863174438), ('Belinda Carlisle|||Love In The Key Of C', 0.9349488019943237), ('Tröckener Kecks|||Met Hart En Ziel', 0.930341899394989)]
2 tasks are running: generate_embeddings (2 running; 0 done).
No tasks are waiting in the queue.
3 steps have not started: join_runs, end, model_testing.
[188134/generate_embeddings/1014310 (pid 87636)] Hit Rate@100 is: 0.17409566654425265
[188134/generate_embeddings/1014310 (pid 87636)] Task finished successfully.
[188134/generate_embeddings/1014309 (pid 87633)] Hit Rate@100 is: 0.1834373852368711
1 task is running: generate_embeddings (1 running; 1 done).
No tasks are waiting in the queue.
3 steps have not started: join_runs, end, model_testing.
[188134/generate_embeddings/1014309 (pid 87633)] Task finished successfully.
[188134/join_runs/1014311 (pid 89559)] Task is starting.
[188134/join_runs/1014311 (pid 89559)] Current result map: {'{"min_count": 5, "epochs": 30, "vector_size": 48, "window": 10, "ns_exponent": 0.75}': 0.1834373852368711, '{"min_count": 10, "epochs": 30, "vector_size": 48, "window": 10, "ns_exponent": 0.75}': 0.17409566654425265}
[188134/join_runs/1014311 (pid 89559)] The best validation score is for model: {"min_count": 5, "epochs": 30, "vector_size": 48, "window": 10, "ns_exponent": 0.75}, 0.1834373852368711
[188134/join_runs/1014311 (pid 89559)] Task finished successfully.
[188134/model_testing/1014312 (pid 89566)] Task is starting.
[188134/model_testing/1014312 (pid 89566)] Hit Rate@100 on the test set is: 0.18613725040165252
[188134/model_testing/1014312 (pid 89566)] Task finished successfully.
[188134/end/1014313 (pid 89584)] Task is starting.
[188134/end/1014313 (pid 89584)] All done
[188134/end/1014313 (pid 89584)]
[188134/end/1014313 (pid 89584)] See you, space cowboy
[188134/end/1014313 (pid 89584)]
[188134/end/1014313 (pid 89584)] Task finished successfully.
Done!

Now you have a flow that will not only help you operationalize the training of your model but can help you train many variations in parallel while seamlessly tracking the results in a variety of modes. Stay tuned for the next episode where you will learn to use Metaflow's Client API to access the models you trained in Python code.