Skip to main content

Natural Language Processing - Episode 4

This episode references the Python script branchflow.py.

In the previous episode, you saw how we constructed a basic flow to compute the baseline for our NLP task. In this lesson, we will learn how to incorporate the model as well as show you how to use branching to compute things in parallel. At the end of this lesson, you will be able to:

  • Refactor a training code into a flow.
  • Process data and train models in parallel with branching.

1What is Branching?

Branching is a powerful feature in Metaflow that allows you complete steps in parallel instead of in a linear fashion. To demonstrate this feature, we will construct our baseline and train steps as two branches that will execute in parallel. It should be noted that anytime you use branching, you also need a join step to disambiguate the branches, which you can read more about here.

2Write a Flow

In this flow, we will modify the start and join steps to achieve branching, as well as add a train step that will train our model.

Below is a detailed explanation of the changes we are making to our original flow:

  1. Create a branching workflow to create a baseline and candidate model in parallel in the baseline and train steps.
    • When we call self.next(self.baseline, self.train), this creates a branching flow that will allow the baseline and train steps to run in parallel.
  2. Add a training step The train step uses a neural-bag-of-words model to train a text classifier.
    • We import the NbowModel module we created in Lesson 1.
    • We save this model in a special way by setting the model_dict property of our custom model to self.model_dict, which has the effect of storing this data in Metaflow's artifact store, where data is versioned and saved automatically.
  3. Add a join step: In this step, we will load our model using NbowModel.from_dict(self.model_dict) as well as disambiguate the data in our branches.
    • The join step can disambiguate data by referring to a specific step in the branch. For example, inputs.train.df refers to the train step, and specifically the df artifact stored in that step.
    • We print the performance metrics of our model and the baseline in this join step.

branchflow.py
from metaflow import FlowSpec, step, Flow, current

class BranchNLPFlow(FlowSpec):

@step
def start(self):
"Read the data"
import pandas as pd
self.df = pd.read_parquet('train.parquet')
self.valdf = pd.read_parquet('valid.parquet')
print(f'num of rows: {self.df.shape[0]}')
self.next(self.baseline, self.train)

@step
def baseline(self):
"Compute the baseline"
from sklearn.metrics import accuracy_score, roc_auc_score
baseline_predictions = [1] * self.valdf.shape[0]
self.base_acc = accuracy_score(
self.valdf.labels, baseline_predictions)
self.base_rocauc = roc_auc_score(
self.valdf.labels, baseline_predictions)
self.next(self.join)

@step
def train(self):
"Train the model"
from model import NbowModel
model = NbowModel(vocab_sz=750)
model.fit(X=self.df['review'], y=self.df['labels'])
self.model_dict = model.model_dict #save model
self.next(self.join)

@step
def join(self, inputs):
"Compare the model results with the baseline."
import pandas as pd
from model import NbowModel
self.model_dict = inputs.train.model_dict
self.train_df = inputs.train.df
self.val_df = inputs.baseline.valdf
self.base_rocauc = inputs.baseline.base_rocauc
self.base_acc = inputs.baseline.base_acc
model = NbowModel.from_dict(self.model_dict)

self.model_acc = model.eval_acc(
X=self.val_df['review'], labels=self.val_df['labels'])
self.model_rocauc = model.eval_rocauc(
X=self.val_df['review'], labels=self.val_df['labels'])

print(f'Baseline Acccuracy: {self.base_acc:.2%}')
print(f'Baseline AUC: {self.base_rocauc:.2}')
print(f'Model Acccuracy: {self.model_acc:.2%}')
print(f'Model AUC: {self.model_rocauc:.2}')
self.next(self.end)

@step
def end(self):
print('Flow is complete')


if __name__ == '__main__':
BranchNLPFlow()

3Run the Flow

python branchflow.py run
     Workflow starting (run-id 1666721142756833):
[1666721142756833/start/1 (pid 53077)] Task is starting.
[1666721142756833/start/1 (pid 53077)] num of rows: 20377
[1666721142756833/start/1 (pid 53077)] Task finished successfully.
[1666721142756833/baseline/2 (pid 53080)] Task is starting.
[1666721142756833/train/3 (pid 53081)] Task is starting.
[1666721142756833/baseline/2 (pid 53080)] Task finished successfully.
[1666721142756833/train/3 (pid 53081)] 319: W tensorflow/core/platform/profile_utils/cpu_utils.cc:128] Failed to get CPU frequency: 0 Hz
[1666721142756833/train/3 (pid 53081)] Epoch 1/10
510/510 [==============================] - 1s 846us/step - loss: 0.3544 - accuracy: 0.8510 - val_loss: 0.2970 - val_accuracy: 0.8759 - loss: 0.7140 - accuracy: 0.50
[1666721142756833/train/3 (pid 53081)] Epoch 2/10
510/510 [==============================] - 0s 668us/step - loss: 0.2967 - accuracy: 0.8779 - val_loss: 0.2954 - val_accuracy: 0.8739 loss: 0.2272 - accuracy: 0.87
[1666721142756833/train/3 (pid 53081)] Epoch 3/10
510/510 [==============================] - 0s 667us/step - loss: 0.2853 - accuracy: 0.8857 - val_loss: 0.2966 - val_accuracy: 0.8776 loss: 0.1206 - accuracy: 1.00
[1666721142756833/train/3 (pid 53081)] Epoch 4/10
510/510 [==============================] - 0s 660us/step - loss: 0.2757 - accuracy: 0.8899 - val_loss: 0.2972 - val_accuracy: 0.8763 loss: 0.2387 - accuracy: 0.90
[1666721142756833/train/3 (pid 53081)] Epoch 5/10
510/510 [==============================] - 0s 660us/step - loss: 0.2692 - accuracy: 0.8945 - val_loss: 0.3018 - val_accuracy: 0.8778 loss: 0.3284 - accuracy: 0.78
[1666721142756833/train/3 (pid 53081)] Epoch 6/10
510/510 [==============================] - 0s 663us/step - loss: 0.2622 - accuracy: 0.8998 - val_loss: 0.3032 - val_accuracy: 0.8751 loss: 0.1531 - accuracy: 0.96
[1666721142756833/train/3 (pid 53081)] Epoch 7/10
510/510 [==============================] - 0s 669us/step - loss: 0.2536 - accuracy: 0.9037 - val_loss: 0.3063 - val_accuracy: 0.8763 loss: 0.4173 - accuracy: 0.81
[1666721142756833/train/3 (pid 53081)] Epoch 8/10
510/510 [==============================] - 0s 663us/step - loss: 0.2408 - accuracy: 0.9120 - val_loss: 0.3117 - val_accuracy: 0.8754 loss: 0.1745 - accuracy: 0.93
[1666721142756833/train/3 (pid 53081)] Epoch 9/10
510/510 [==============================] - 0s 659us/step - loss: 0.2309 - accuracy: 0.9176 - val_loss: 0.3267 - val_accuracy: 0.8705 loss: 0.1803 - accuracy: 0.93
[1666721142756833/train/3 (pid 53081)] Epoch 10/10
510/510 [==============================] - 0s 668us/step - loss: 0.2260 - accuracy: 0.9223 - val_loss: 0.3341 - val_accuracy: 0.8734 loss: 0.1451 - accuracy: 0.93
[1666721142756833/train/3 (pid 53081)] WARNING:absl:Function `_wrapped_model` contains input name(s) Input with unsupported characters which will be renamed to input in the SavedModel.
[1666721142756833/train/3 (pid 53081)] Task finished successfully.
[1666721142756833/join/4 (pid 53087)] Task is starting.
[1666721142756833/join/4 (pid 53087)] 334: W tensorflow/core/platform/profile_utils/cpu_utils.cc:128] Failed to get CPU frequency: 0 Hz
71/71 [==============================] - 0s 330us/stepin/4 (pid 53087)] 1/71 [..............................] - ETA:
71/71 [==============================] - 0s 304us/stepin/4 (pid 53087)] 1/71 [..............................] - ETA:
[1666721142756833/join/4 (pid 53087)] Baseline Acccuracy: 77.30%
[1666721142756833/join/4 (pid 53087)] WARNING:absl:Function `_wrapped_model` contains input name(s) Input with unsupported characters which will be renamed to input in the SavedModel.
[1666721142756833/join/4 (pid 53087)] Baseline AUC: 0.5
[1666721142756833/join/4 (pid 53087)] Model Acccuracy: 87.10%
[1666721142756833/join/4 (pid 53087)] Model AUC: 0.92
[1666721142756833/join/4 (pid 53087)] Task finished successfully.
[1666721142756833/end/5 (pid 53090)] Task is starting.
[1666721142756833/end/5 (pid 53090)] Flow is complete
[1666721142756833/end/5 (pid 53090)] Task finished successfully.
Done!

We can see from the Metaflow logs that our model looks promising in that it is performing better than the baseline! However, computing the baseline isn't just meant for the logs! We should use the baseline alongside other tests to gate which models make it to production.

In the next lesson, you will learn how to test our models and use tagging to manage which models are promoted to production.