Skip to main content

Reuse Model Object

Question

How can I reuse model code in training and prediction flows?

Solution

A common pattern when using Metaflow is to move complex business logic outside of the flow. This makes the logic callable from multiple flows and more easily tested independent of the flow.

1Make Class Used in Multiple Flows

Imagine you have the following model class:

model.py
class Model():

def init_model(self, model_type = None, params:dict = {}):
return model_type(**params)

def train(self, model, features, labels):
return model.fit(features, labels)

def score(self, model, features, true_labels):
preds = model.predict(features)
return {
"accuracy": sum(true_labels==preds)/len(true_labels)
}

Now you can use multiple inheritance with this object when we instantiate our FlowSpec class.

2Use Model Class in Training Flow

This flow demonstrates how the Model class functions can be inherited by the flow. The flow shows how to:

  • Instantiate and splits a dataset from scikit-learn.
  • Initialize a model using the previously defined class.
  • Train the model.
  • Score the model on a validation set and prints the result.
train_model_flow.py
from metaflow import step, FlowSpec
from model import Model

class TrainingFlow(FlowSpec, Model):

@step
def start(self):
from sklearn import datasets
from sklearn.model_selection import train_test_split
self.iris = datasets.load_iris()
X, y = self.iris['data'], self.iris['target']
self.labels = self.iris['target_names']
split = train_test_split(X, y, test_size=0.2)
self.X_train, self.X_test = split[0], split[1]
self.y_train, self.y_test = split[2], split[3]
self.next(self.make_model)

@step
def make_model(self):
from sklearn.ensemble import RandomForestClassifier
self.params = {"max_depth": 8}
self.model = self.init_model(
model_type = RandomForestClassifier,
params = self.params
)
self.next(self.train_model)

@step
def train_model(self):
self.model = self.train(self.model, self.X_train, self.y_train)
self.next(self.end)

@step
def end(self):
scores = self.score(self.model, self.X_test, self.y_test)
print('Accuracy: ', scores['accuracy'])

if __name__ == "__main__":
TrainingFlow()
python train_model_flow.py run
     Workflow starting (run-id 864):
[864/start/4336 (pid 17451)] Task is starting.
[864/start/4336 (pid 17451)] Task finished successfully.
[864/make_model/4337 (pid 17461)] Task is starting.
[864/make_model/4337 (pid 17461)] Task finished successfully.
[864/train_model/4338 (pid 17466)] Task is starting.
[864/train_model/4338 (pid 17466)] Task finished successfully.
[864/end/4339 (pid 17475)] Task is starting.
[864/end/4339 (pid 17475)] Accuracy: 0.9
[864/end/4339 (pid 17475)] Task finished successfully.
Done!

3Use Model Class in Scoring Flow

Now you can use multiple inheritance again to instantiate a different flow.

This flow shows how to:

  • Create a test dataset to score.
  • Instantiate a model using the trained model object from TrainFlow.
  • Use the common Model class function to score the model on the test dataset.
scoring_model_flow.py
from metaflow import step, FlowSpec
from model import Model

class ScoringFlow(FlowSpec, Model):

sibling_flow = 'TrainingFlow'

@step
def start(self):
from sklearn import datasets
iris = datasets.load_iris()
self.X, self.y = iris['data'], iris['target']
self.next(self.score_trained_model)

@step
def score_trained_model(self):
from metaflow import Flow
run = Flow(self.sibling_flow).latest_successful_run
self.model = run['end'].task.data.model
self.scores = self.score(self.model, self.X, self.y)
self.next(self.end)

@step
def end(self):
print('Accuracy: ', self.scores['accuracy'])

if __name__ == "__main__":
ScoringFlow()
python scoring_model_flow.py run
     Workflow starting (run-id 865):
[865/start/4341 (pid 17487)] Task is starting.
[865/start/4341 (pid 17487)] Task finished successfully.
[865/score_trained_model/4342 (pid 17499)] Task is starting.
[865/score_trained_model/4342 (pid 17499)] Task finished successfully.
[865/end/4343 (pid 17511)] Task is starting.
[865/end/4343 (pid 17511)] Accuracy: 0.98
[865/end/4343 (pid 17511)] Task finished successfully.
Done!

Further Reading