Skip to main content

Use Keras with Metaflow

Question

How do I build and fit a Keras model in a Metaflow flow?

Solution

There are several ways to build Keras models. In the example you will see the Keras Sequential API in a Metaflow flow.

The flow shows how to:

  • Load and preprocess data.
  • Make a Keras model.
  • Fit and save the model.
    • Since Keras models are not serializable with Pickle and Metaflow uses Pickle to serialize artifacts, the model is saved to disk in .h5 format and then saved as bytes instead of directly saving the object to self.model.
  • Load the model and evaluate it on the test dataset.
fit_keras.py
from metaflow import FlowSpec, step, Parameter, batch
import tensorflow as tf
import tempfile
from keras_helpers import create_model

class KerasFlow(FlowSpec):

@step
def start(self):
import numpy as np
self.n_class = 10
self.input_shape = (28, 28, 1)
mnist = tf.keras.datasets.mnist.load_data()
(x_train, y_train), (x_test, y_test) = mnist
x_train = x_train.astype("float32") / 255
x_test = x_test.astype("float32") / 255
self.x_train = np.expand_dims(x_train, -1)
self.x_test = np.expand_dims(x_test, -1)
self.y_train = tf.keras.utils.to_categorical(
y_train, self.n_class)
self.y_test = tf.keras.utils.to_categorical(
y_test, self.n_class)
self.next(self.fit_model)

@step
def fit_model(self):
model = create_model(self.x_train, self.y_train,
self.input_shape,
self.n_class)
with tempfile.NamedTemporaryFile() as f:
tf.keras.models.save_model(model, f.name,
save_format='h5')
self.model = f.read()
self.next(self.evaluate_model)

@step
def evaluate_model(self):
with tempfile.NamedTemporaryFile() as f:
f.write(self.model)
f.flush()
model = tf.keras.models.load_model(f.name)
self.score = model.evaluate(self.x_test,
self.y_test,
verbose=0)
self.next(self.end)

@step
def end(self):
print("Test loss:", self.score[0])
print("Test accuracy:", self.score[1])

if __name__ == "__main__":
KerasFlow()

The Keras code used in this flow comes from this MNIST example. The model is built in the create_model function of the keras_helpers.py script that is used in the flow.

keras_helpers.py
import tensorflow as tf

def create_model(x_train, y_train, input_shape,
n_class):
model = tf.keras.Sequential([
tf.keras.layers.Input(shape=input_shape),
tf.keras.layers.Conv2D(
32, kernel_size=(3, 3), activation="relu"),
tf.keras.layers.MaxPooling2D(pool_size=(2, 2)),
tf.keras.layers.Conv2D(
64, kernel_size=(3, 3), activation="relu"),
tf.keras.layers.MaxPooling2D(pool_size=(2, 2)),
tf.keras.layers.Flatten(),
tf.keras.layers.Dropout(0.5),
tf.keras.layers.Dense(n_class,
activation="softmax"),
])
model.compile(loss="categorical_crossentropy",
optimizer="adam",
metrics=["accuracy"])
model.fit(x_train, y_train,
batch_size=32, epochs=1,
validation_split=0.1)
return model

Run Flow

python fit_keras.py run
    ...
[1654221292500619/start/1 (pid 71403)] Task is starting.
[1654221292500619/start/1 (pid 71403)] Task finished successfully.
...
[1654221292500619/end/4 (pid 71613)] Task is starting.
[1654221292500619/end/4 (pid 71613)] Test loss: 0.0586005374789238
[1654221292500619/end/4 (pid 71613)] Test accuracy: 0.9815000295639038
[1654221292500619/end/4 (pid 71613)] Task finished successfully.
...

Further Reading