Skip to main content

Natural Language Processing - Episode 5

This episode references the Python script nlpflow.py.

In the previous episode, you saw how we trained a model and compared it to a baseline. However, what if your model is worse than the baseline? Is there a way to manage this situation programmatically? An important Metaflow feature that can enable this is tagging. Tagging allows you to categorize and organize flows, which we can use to mark certain models as "production candidates.” At the end of this lesson, you will be able to:

  • Collaborate on and organize flows with tagging.
  • Implement common design patterns for testing machine learning models.

1What is Tagging?

Tags allow you to express opinions about the results of your and your colleagues' work, and, importantly, change those assessments at any time. In contrast to runs and artifacts that represent immutable facts (history shouldn't be rewritten), the way how you interpret those facts may change over time, which is reflected in tags. This makes tags ideal for managing which models are promoted to the next step in your modeling workflow.

You can add a tag to a flow with only a few lines of code. Below is a snippet of code we will use to add tags in our flow:

from metaflow import Flow, current
run = Flow(current.flow_name)[current.run_id]
run.add_tag('deployment_candidate')

2Write a Flow

In this flow, we modify our end step to apply the tag deployment_candidate if our model passes two tests: (1) a baseline (2) and a smoke test.

Concretely, we will add the following to the end step:

  1. A smoke test that tests that the model is performing correctly against very easy examples that it should not be getting wrong. A smoke test is a lightweight way to catch unexpected behaviors in your model, even if your model is beating the baseline.
  2. A comparison of the model with the baseline. We are going to check if our model's AUC score is better than the baseline. There are more advanced variations on this technique, including using other models for baselines, or requiring that your model performs better than the baseline by a specific margin. We leave these variations as an exercise for the reader.
  3. Add a tag if our model passes the smoke test and beats the baseline.

nlpflow.py
from metaflow import FlowSpec, step, Flow, current

class NLPFlow(FlowSpec):

@step
def start(self):
"Read the data"
import pandas as pd
self.df = pd.read_parquet('train.parquet')
self.valdf = pd.read_parquet('valid.parquet')
print(f'num of rows: {self.df.shape[0]}')
self.next(self.baseline, self.train)

@step
def baseline(self):
"Compute the baseline"
from sklearn.metrics import accuracy_score, roc_auc_score
baseline_predictions = [1] * self.valdf.shape[0]
self.base_acc = accuracy_score(
self.valdf.labels, baseline_predictions)
self.base_rocauc = roc_auc_score(
self.valdf.labels, baseline_predictions)
self.next(self.join)

@step
def train(self):
"Train the model"
from model import NbowModel
model = NbowModel(vocab_sz=750)
model.fit(X=self.df['review'], y=self.df['labels'])
self.model_dict = model.model_dict #save model
self.next(self.join)

@step
def join(self, inputs):
"Compare the model results with the baseline."
import pandas as pd
from model import NbowModel
self.model_dict = inputs.train.model_dict
self.train_df = inputs.train.df
self.val_df = inputs.baseline.valdf
self.base_rocauc = inputs.baseline.base_rocauc
self.base_acc = inputs.baseline.base_acc
model = NbowModel.from_dict(self.model_dict)

self.model_acc = model.eval_acc(
X=self.val_df['review'], labels=self.val_df['labels'])
self.model_rocauc = model.eval_rocauc(
X=self.val_df['review'], labels=self.val_df['labels'])

print(f'Baseline Acccuracy: {self.base_acc:.2%}')
print(f'Baseline AUC: {self.base_rocauc:.2}')
print(f'Model Acccuracy: {self.model_acc:.2%}')
print(f'Model AUC: {self.model_rocauc:.2}')
self.next(self.end)

@step
def end(self):
"""Tags model as a deployment candidate
if it beats the baseline and passes smoke tests."""
from model import NbowModel
model = NbowModel.from_dict(self.model_dict)

self.beats_baseline = self.model_rocauc > self.base_rocauc
print(f'Model beats baseline (T/F): {self.beats_baseline}')
#smoke test to make sure model does the right thing.
_tst_reviews = [
"poor fit its baggy in places where it isn't supposed to be.",
"love it, very high quality and great value"
]
_tst_preds = model.predict(_tst_reviews)
check_1 = _tst_preds[0][0] < .5
check_2 = _tst_preds[1][0] > .5
self.passed_smoke_test = check_1 and check_2
msg = 'Model passed smoke test (T/F): {}'
print(msg.format(self.passed_smoke_test))

if self.beats_baseline and self.passed_smoke_test:
run = Flow(current.flow_name)[current.run_id]
run.add_tag('deployment_candidate')


if __name__ == '__main__':
NLPFlow()

3Run the Flow

python nlpflow.py run
     Workflow starting (run-id 1666721209729488):
[1666721209729488/start/1 (pid 53130)] Task is starting.
[1666721209729488/start/1 (pid 53130)] num of rows: 20377
[1666721209729488/start/1 (pid 53130)] Task finished successfully.
[1666721209729488/baseline/2 (pid 53133)] Task is starting.
[1666721209729488/train/3 (pid 53134)] Task is starting.
[1666721209729488/baseline/2 (pid 53133)] Task finished successfully.
[1666721209729488/train/3 (pid 53134)] 372: W tensorflow/core/platform/profile_utils/cpu_utils.cc:128] Failed to get CPU frequency: 0 Hz
[1666721209729488/train/3 (pid 53134)] Epoch 1/10
510/510 [==============================] - 1s 832us/step - loss: 0.3514 - accuracy: 0.8536 - val_loss: 0.2995 - val_accuracy: 0.8732 - loss: 0.7075 - accuracy: 0.50
[1666721209729488/train/3 (pid 53134)] Epoch 2/10
510/510 [==============================] - 0s 654us/step - loss: 0.2987 - accuracy: 0.8770 - val_loss: 0.2958 - val_accuracy: 0.8751 loss: 0.3716 - accuracy: 0.75
[1666721209729488/train/3 (pid 53134)] Epoch 3/10
510/510 [==============================] - 0s 657us/step - loss: 0.2837 - accuracy: 0.8872 - val_loss: 0.2941 - val_accuracy: 0.8756 loss: 0.3171 - accuracy: 0.87
[1666721209729488/train/3 (pid 53134)] Epoch 4/10
510/510 [==============================] - 0s 661us/step - loss: 0.2746 - accuracy: 0.8910 - val_loss: 0.2993 - val_accuracy: 0.8756 loss: 0.3674 - accuracy: 0.87
[1666721209729488/train/3 (pid 53134)] Epoch 5/10
510/510 [==============================] - 0s 653us/step - loss: 0.2686 - accuracy: 0.8941 - val_loss: 0.3024 - val_accuracy: 0.8778 loss: 0.1305 - accuracy: 0.96
[1666721209729488/train/3 (pid 53134)] Epoch 6/10
510/510 [==============================] - 0s 657us/step - loss: 0.2591 - accuracy: 0.9000 - val_loss: 0.3013 - val_accuracy: 0.8754 loss: 0.1780 - accuracy: 0.90
[1666721209729488/train/3 (pid 53134)] Epoch 7/10
510/510 [==============================] - 0s 701us/step - loss: 0.2484 - accuracy: 0.9051 - val_loss: 0.3086 - val_accuracy: 0.8786 loss: 0.2135 - accuracy: 0.96
[1666721209729488/train/3 (pid 53134)] Epoch 8/10
510/510 [==============================] - 0s 664us/step - loss: 0.2380 - accuracy: 0.9112 - val_loss: 0.3232 - val_accuracy: 0.8702 loss: 0.2622 - accuracy: 0.93
[1666721209729488/train/3 (pid 53134)] Epoch 9/10
510/510 [==============================] - 0s 661us/step - loss: 0.2327 - accuracy: 0.9171 - val_loss: 0.3247 - val_accuracy: 0.8685 loss: 0.1195 - accuracy: 1.00
[1666721209729488/train/3 (pid 53134)] Epoch 10/10
510/510 [==============================] - 0s 655us/step - loss: 0.2220 - accuracy: 0.9221 - val_loss: 0.3326 - val_accuracy: 0.8680 loss: 0.2456 - accuracy: 0.90
[1666721209729488/train/3 (pid 53134)] WARNING:absl:Function `_wrapped_model` contains input name(s) Input with unsupported characters which will be renamed to input in the SavedModel.
[1666721209729488/train/3 (pid 53134)] Task finished successfully.
[1666721209729488/join/4 (pid 53140)] Task is starting.
[1666721209729488/join/4 (pid 53140)] 185: W tensorflow/core/platform/profile_utils/cpu_utils.cc:128] Failed to get CPU frequency: 0 Hz
71/71 [==============================] - 0s 343us/stepin/4 (pid 53140)] 1/71 [..............................] - ETA:
71/71 [==============================] - 0s 348us/stepin/4 (pid 53140)] 1/71 [..............................] - ETA:
[1666721209729488/join/4 (pid 53140)] Baseline Acccuracy: 77.30%
[1666721209729488/join/4 (pid 53140)] WARNING:absl:Function `_wrapped_model` contains input name(s) Input with unsupported characters which will be renamed to input in the SavedModel.
[1666721209729488/join/4 (pid 53140)] Baseline AUC: 0.5
[1666721209729488/join/4 (pid 53140)] Model Acccuracy: 87.28%
[1666721209729488/join/4 (pid 53140)] Model AUC: 0.92
[1666721209729488/join/4 (pid 53140)] Task finished successfully.
[1666721209729488/end/5 (pid 53148)] Task is starting.
[1666721209729488/end/5 (pid 53148)] Model beats baseline (T/F): True
[1666721209729488/end/5 (pid 53148)] 108: W tensorflow/core/platform/profile_utils/cpu_utils.cc:128] Failed to get CPU frequency: 0 Hz
1/1 [==============================] - 0s 33ms/step/end/5 (pid 53148)] 1/1 [==============================] - ETA:
[1666721209729488/end/5 (pid 53148)] Model passed smoke test (T/F): True
[1666721209729488/end/5 (pid 53148)] WARNING:absl:Function `_wrapped_model` contains input name(s) Input with unsupported characters which will be renamed to input in the SavedModel.
[1666721209729488/end/5 (pid 53148)] Task finished successfully.
Done!

Now that we have tagged our model if it meets our minimum standards, we are now ready to use this model in downstream workflows. In the next lesson, we will explore different ways you can utilize the model you have trained.