New in Metaflow: Train at scale with AI/ML frameworks

Today, we are releasing a major new capability in Metaflow: You can now use Metaflow for distributed high-performance computing in general, large-scale training for AI in particular, using popular frameworks: PyTorch, Ray, Tensorflow, Deepspeed, and MPI as described in this article. For a higher-level motivation for using Metaflow to handle compute, see the first part of the announcement.

Since its inception, Metaflow has supported a number of compute patterns:

  1. Local, non-distributed compute utilizing parallel CPU cores and local GPUs.
  2. Vertical scaling using the @resources decorator.
  3. Task parallelism through static branches.
  4. Horizontal scaling through dynamic foreach branches.

Today, we are adding a new pattern on this list:

  1. Ephemeral compute clusters through framework-specific extensions, described below.

See the first part of this announcement, Compute Anything with Metaflow, for an overview of these patterns at a high-level. This article introduces each new extension in more detail.

Stay tuned for more detailed tutorials and deep dives into each framework in future posts!

Trying the new extensions at home

To run the code examples in this post, make sure you have a Metaflow deployment that is configured to use AWS Batch, which you can deploy by using these instructions.

Specifically, this release doesn't include support for Kubernetes yet. If you are interested in using these frameworks on Kubernetes, contact us to get started.

Each extension is released as a separate package, which you have to pip install separately from Metaflow itself. However, once installed, Metaflow will package the extension automatically for remote execution, so you don't need to include the extension in the @pypi or @conda decorators.

Note that as the extensions are not part of the Metaflow package itself, they are not subject to the Metaflow's strict backwards compatibility guarantee and hence their APIs may change over time. This is necessary as the frameworks behind the extensions evolve rather quickly too.

Under the hood: The @parallel decorator

You can skip this section if you just want to use one of the new extensions. Or read on if you are curious to understand how the extensions work under the hood.

All the new extensions follow this foundational pattern:

In contrast to the Metaflow trusted foreach pattern, a crucial difference here is that the tasks may communicate with each other whereas tasks in a foreach are independent and isolated from each other. In this case, one of the tasks is designed as a leader or a control task, coordinating work amongst worker tasks.

In other words, tasks form a tightly interconnected cluster, instead of being executed in an embarrasingly parallel fashion as with foreach. Crucially, all the tasks forming such a cluster need to execute simultaneously in parallel, to allow them to communicate with each other and the control node.

Metaflow includes a low-level decorator called @parallel which takes care of setting up a suitable cluster on the fly. Most importantly, the @parallel decorator takes care of gang-scheduling a set of compute nodes for tasks using the underlying compute layer, for instance leveraging multinode jobs in AWS Batch. Once a cluster is up and running, @parallel designates one of the nodes as the control node, informing the worker nodes of the address of the control through environment variables.

Currently, the only parameter you need to provide @parallel is the new num_parallel option in the self.next expression that determines the number of nodes in the cluster. The cluster is formed according to the @resources request in the subsequent step that executed inside the cluster.

All these operations are rather low-level and they are not exposed to the end user directly. Instead, all the extensions use the @parallel decorator under the hood to create a cluster, on top of which a framework-specific compute cluster can be instantiated.

As a result, the developer can focus on using the frameworks as usual - almost all framework-specific examples you can find online should work as-is with Metaflow - while Metaflow takes care of the low level infrastructrure.

Ray

Ray is an open-source compute framework for scaling AI and Python workloads. It is a popular option for scheduling distributed training jobs in the cloud due its powerful distributed compute primitives, and the easy access Ray’s Python SDKs enable on top of these distributed compute primitives.

Besides distributed training, Ray provides higher-level interfaces like Ray Tune for hyperparameter tuning and Ray Serve for inference. This discussion focuses on training use cases.

The new @metaflow_ray decorator was contributed by Autodesk that uses Metaflow to power their ML workloads in general. With the new Ray integration, developers can embed their workloads written in Ray natively in their Metaflow workflows without having to worry about setting up Ray clusters separately. Metaflow takes cares of creating ephemeral Ray clusters on the fly.

Getting started

Run pip install metaflow-ray to get started, and find examples here, many of which have Ray code directly copy-and-pasted from Ray’s extensive documentation. Here is a basic outline of what Metaflow user code looks like when using the @metaflow_ray decorator, which does these things:

  1. Forms a Ray cluster of two nodes dynamically at Metaflow task runtime, and
  2. Runs the code in the Metaflow @step function on the cluster.
from metaflow import FlowSpec, step, current, metaflow_ray

class HelloRay(FlowSpec):

    def _do_ray_job(self):

        import ray
        ray.init()
        print("Ray initialized in the %s step." % current.step_name)
        for k, v in ray.cluster_resources().items():
            if "memory" in k.lower():
                print("%s: %sGB" % (k, round(int(v) / (1024 * 1024 * 1024), 2)))
            else:
                print("%s: %s" % (k, v))

    @step
    def start(self):
        self.next(self.my_ray_code, num_parallel=2)

    @metaflow_ray
    @step
    def my_ray_code(self):
        self._do_ray_job()
        self.next(self.join)

    @step
    def join(self):
        self._do_ray_job()

    @step
    def end(self):
        self._do_ray_job()

if __name__ == "__main__":
    HelloRay()

Tensorflow

Tensorflow is the OG of modern deep learning frameworks. Companies have been using Tensorflow with Metaflow for years without special integrations: You can execute Tensorflow code as a part of your @steps which may access multiple GPUs.

The new @tensorflow decorator adds support for Tensorflow's native distribution functionality, Tensorflow Distributed, which distributes operations across multiple GPUs, potentially across multiple nodes.

The @tensorflow decorator allows you to run your custom training loops or keras.Model().fit() programs on one or multiple workers using any of the built in strategies. When using the decorator, you don't need to worry about setting up the cluster and the TF_CONFIG environment variable on each worker, as the decorator does this for you.

Getting started

Run pip install metaflow-tensorflow to get started, and find getting started examples here. Here is a basic outline of what Metaflow user code looks like when using the @tensorflow decorator:

from metaflow import FlowSpec, step, batch, tensorflow

class MultiNodeTensorFlow(FlowSpec):

    @step
    def start(self):
        self.next(self.train, num_parallel=2)

    @batch(gpu=2, image="tensorflow/tensorflow:latest-gpu")
    @tensorflow
    @step
    def train(self):
        import tensorflow as tf
        strategy = tf.distribute.MultiWorkerMirroredStrategy()
        with strategy.scope():
             model = build_keras_model()
        model.fit(dataset)
        self.next(self.join)

    @step
    def join(self, inputs):
        self.next(self.end)

    @step
    def end(self):
        pass

if __name__ == "__main__":
    MultiNodeTensorFlow()

PyTorch Distributed and Torchrun

PyTorch is the most popular deep learning framework in 2023. Similar to the Tensorflow integration, the new @torchrun decorator integrates with Torch's native distribution mechanism, Torch Distributed which supports a variety of patterns for distributed model training.

Torch Distributed provides a convenience layer, torchrun which makes it easier to launch torch.distributed scripts on the command line, assuming you have a suitable cluster already set up.

The new @torchrun decorator takes care of setting up the cluster and calling your PyTorch script, which can be a PyTorch lightning script or regular PyTorch code. Crucially, you don't need to change anything in your Torch code to benefit from the new decorator.

When you annotate a Metaflow step with @torchrun, you expose a new call on the Metaflow current object. Specifically, you can copy-paste the torchrun command you use into this form inside your Metaflow step:

current.torch.run(
        entrypoint= “your-script.py”,
        entrypoint_args={“param1”: “1”, “param2”: “2”, …},
        nproc_per_node=1
    )

Getting started

Run pip install metaflow-torchrun to get started, and find examples here, many of which have PyTorch code directly copy-and-pasted from the Torch distributed documentation. Here is a basic outline of what Metaflow user code looks like when using the @torchrun decorator:

from metaflow import FlowSpec, step, torchrun, current, batch

class HelloTorchrun(FlowSpec):

    @step
    def start(self):
        self.next(self.torch_multinode, num_parallel=2)

    @batch(image="pytorch/pytorch:latest", cpu=2)
    @torchrun
    @step
    def torch_multinode(self):
        current.torch.run(entrypoint="my-torch-script.py")
        self.next(self.join)

    @step
    def join(self, inputs):
        self.next(self.end)

    @step
    def end(self):
        pass

if __name__ == "__main__":
    HelloTorchrun()

Note that my-torch-script.py gets packaged by Metaflow automatically for remote execution as long as it exists in the same directory as your flow code.

Deepspeed

Deepspeed is a fast-emerging framework for distributed training, originally created at Microsoft. It is built for the largest of use cases, such as training a 180B parameter model.

Working with Deepspeed is primarily about learning how to set the Deepspeed config, based on many factors such as: model size, the compute power in your cluster, how much memory each GPU has, whether you are using quantization, etc. One of the fundamental innovations of Deepspeed revolves is the Zero Redundancy Optimizer (ZeRO) which has numerous parameters, all centered on offloading some parts of a large model’s state from GPU memory to CPU memory.

As of today, these parameters are exposed to you directly but it is possible that we will extend the extension to include reasonable defaults over time.

Like in the @torchrun above, when you annotate a Metaflow step with @deepspeed, you expose a new call on the current object. You can copy and paste the Deepspeed command you use into this form inside your Metaflow step:

current.deepspeed.run(
        entrypoint="train.py",
        entrypoint_args=["--checkpoint_dir", "experiment_deepspeed"]
    )

Getting started

Run pip install metaflow-deepspeed to get started, and find examples here, including how to train BERT, or run our fork of Databricks Dolly fine-tuning repo on multiple nodes. Here is a basic outline of what Metaflow user code looks like when using the @deepspeed decorator:

from metaflow import FlowSpec, step, deepspeed, current, batch

class HelloDeepspeed(FlowSpec):

    @step
    def start(self):
        self.next(self.train, num_parallel=4)

    @batch(gpu=1, memory=24000, cpu=8)
    @deepspeed
    @step
    def train(self):
        current.deepspeed.run(entrypoint="hi-deepspeed.py")
        self.next(self.join)

    @step
    def join(self, inputs):
        self.next(self.end)

    @step
    def end(self):
        pass


if __name__ == "__main__":
    HelloDeepspeed()

Note that hi-deepspeed.py gets packaged by Metaflow automatically for remote execution as long as it exists in the same directory as your flow code.

MPI

If Tensorflow is the OG of modern deep learning frameworks, Message Passing Interface or MPI could be characterized as the OG of modern High-Performance Computing (HPC). It was created more than three decades ago and it has been powering compute-intensive workloads on supercomputers since.

MPI by itself is a low-level standard with multiple implementations, notably including open-source OpenMPI which is used by custom programs to pass messages between compute tasks in a low-latency, high-throughput fashion.

Historically, MPI programs are written in C/C++ or Fortran, or a language that can interface with these lower level languages like Python with its mpi4py package.

In order to pass messages between nodes, users need to be logged into the head (control) node of an MPI cluster, which needs certain communication channels between the nodes open. Metaflow's new @mpi decorator opens these communication channels for you, so you just need to worry about the MPI code.

Annotating steps with this decorator also adds four new methods to the current object you can run:

  • current.mpi.exec: arguments of this function match the mpiexec command structure
  • current.mpi.run: arguments of this function match the mpirun command structure
  • current.mpi.cc: arguments of this function match the mpicc command structure
  • current.mpi.broadcast_file: this function takes one argument, a file existing on the control node, to be distributed to all other nodes. Use this to pass a binary compiled on the control node to all workers.

Getting started

Run pip install metaflow-mpi to get started, and find examples here, including how to run an mpi4py program - as well as complete routines for compiling, broadcasting, and running your MPI C programs inside Metaflow tasks. Here is a basic outline of what Metaflow user code looks like when using the @mpi decorator:

# flow.py

from metaflow import FlowSpec, step, batch, mpi, current

class MPI4PyFlow(FlowSpec):

    @step
    def start(self):
        self.next(self.multinode, num_parallel=4)

    @batch(cpu=4, memory=16000)
    @mpi
    @step
    def multinode(self):
        current.mpi.exec(
            args=["-n", “16”],
            program="python my_mpi4py_script.py",
        )
        self.next(self.join)

    @step
    def join(self, inputs):
        self.next(self.end)

    @step
    def end(self):
        pass

if __name__ == "__main__":
    MPI4PyFlow()

Summary

All the extensions follow the same common pattern: They allow you to setup ephemeral compute clusters on the fly, as a part of a larger workflow. You can then use your favorite frameworks to build larger models, fine-tune more efficiently, and handle more data.

Compute of this nature used to require highly specialized software stacks and dedicated engineers - a far cry from providing straightforward access to compute for most users. Furthermore, the setups tended to be finicky enough, that they warranted special treatment outside of usual production systems.

We believe that compute - including distribute compute and model training - is a crucial part of the full stack for ML and AI which needs to integrate seamlessly with the rest of the stack. With the new decorators, you can bridge sophisticated model training and other demanding compute steps into larger ML/AI systems, benefiting from all the functionality of Metaflow from fast data access, visualization, tracking, and robust production orchestration.

If you want to start using the new extensions, we would love to hear from you! Join thousands of other AI, ML, and data developers in the Metaflow Community Slack and ask anything on #ask-metaflow!

Authors

Start building today

Join our office hours for a live demo! Whether you're curious about Outerbounds or have specific questions - nothing is off limits.


Thank you! Your submission has been received!
Oops! Something went wrong while submitting the form.