Skip to main content

Natural Language Processing - Episode 7

In Episode 5, you saw how to train a model and tag the model if it passed certain tests to indicate that it was ready for downstream processes. In this episode, you will retrieve this model for use in other flows. You will use this model to make predictions on a batch of data, and store those predictions somewhere they can be used in downstream applications.

1Use your Trained Model in a Prediction Flow

With the Metaflow client API, you can retrieve your artifacts in whatever downstream application you want, or even just use the API for ad-hoc testing.

You can utilize the client API to also retrieve model artifacts within a flow!

This flow contains the following steps:

  1. Get the latest deployment candidate using the Metaflow API in the start step. Recall that the name of our previous flow is NLPFlow.
  2. Make predictions with our deployment candidate on a new dataset and write that to a parquet file in the batch_predict step.
from metaflow import FlowSpec, step, Flow, current, Parameter

class BatchPredict(FlowSpec):

prediction_storage_uri = Parameter(
help = "Where to park your predictions for other apps to consume."

def get_latest_successful_run(self, flow_nm, tag):
"""Gets the latest successful run
for a flow with a specific tag."""
for r in Flow(flow_nm).runs(tag):
if r.successful: return r

def start(self):
"""Get the latest deployment candidate
that is from a successfull run"""
self.deploy_run = self.get_latest_successful_run(
'NLPFlow', 'deployment_candidate')

def batch_predict(self):
"Make predictions"
from model import NbowModel
import pandas as pd
import pyarrow as pa
new_reviews = pd.read_parquet(

# Make predictions
model = NbowModel.from_dict(
predictions = model.predict(new_reviews)
msg = 'Writing predictions to parquet: {} rows'
pa_tbl = pa.table({"data": predictions.squeeze()})
pa_tbl, self.prediction_storage_uri)

def end(self):
print(f"\n\nAll done! Now you can read your predictions at {self.prediction_storage_uri}.\n\n")

if __name__ == '__main__':

2Run the Prediction Flow

python run
     Workflow starting (run-id 1680367095362550):
[1680367095362550/start/1 (pid 45395)] Task is starting.
[1680367095362550/start/1 (pid 45395)] Task finished successfully.
[1680367095362550/batch_predict/2 (pid 45398)] Task is starting.
[1680367095362550/batch_predict/2 (pid 45398)] 073: W tensorflow/core/platform/profile_utils/] Failed to get CPU frequency: 0 Hz
[1680367095362550/batch_predict/2 (pid 45398)] Writing predictions to parquet: 2264 rows
[1680367095362550/batch_predict/2 (pid 45398)] Task finished successfully.
[1680367095362550/end/3 (pid 45401)] Task is starting.
[1680367095362550/end/3 (pid 45401)]
[1680367095362550/end/3 (pid 45401)]
[1680367095362550/end/3 (pid 45401)] All done! Now you can read your predictions at sentiment_predictions.parquet.
[1680367095362550/end/3 (pid 45401)]
[1680367095362550/end/3 (pid 45401)]
[1680367095362550/end/3 (pid 45401)] Task finished successfully.

The batch prediction pattern is a good way to start deploying a new machine learning system. As a next step to, consider writing these predictions to cloud storage such as GCP Storage or S3. If you need high availablility and partition tolerance you could write predictions to a system like AWS DynamoDB or Azure Cosmos DB.

The batch pattern are desirable in many machine learning contexts, such as when your ML service can update its predictions on a schedule instead of in real-time, and when you expect to support many users with low latency predictions. In other cases, you may want to put your model behind an endpoint, so you can make granular requests to the model in time. In the next lesson, we will put a model behind an endpoint using FastAPI. See you there!