Today, we are releasing a major new capability in Metaflow: You can now use Metaflow for distributed high-performance computing in general, large-scale training for AI in particular, using popular frameworks: PyTorch, Ray, Tensorflow, Deepspeed, and MPI as described in this article. For a higher-level motivation for using Metaflow to handle compute, see the first part of the announcement.
Since its inception, Metaflow has supported a number of compute patterns:
- Local, non-distributed compute utilizing parallel CPU cores and local GPUs.
- Vertical scaling using the
@resources
decorator. - Task parallelism through static branches.
- Horizontal scaling through dynamic foreach branches.
Today, we are adding a new pattern on this list:
- Ephemeral compute clusters through framework-specific extensions, described below.
See the first part of this announcement, Compute Anything with Metaflow, for an overview of these patterns at a high-level. This article introduces each new extension in more detail.
Stay tuned for more detailed tutorials and deep dives into each framework in future posts!
Trying the new extensions at home
To run the code examples in this post, make sure you have a Metaflow deployment that is configured to use AWS Batch, which you can deploy by using these instructions.
Specifically, this release doesn't include support for Kubernetes yet. If you are interested in using these frameworks on Kubernetes, contact us to get started.
Each extension is released as a separate package, which you have to pip install
separately from Metaflow itself. However, once installed, Metaflow will package the extension automatically for remote execution, so you don't need to include the extension in the @pypi
or @conda
decorators.
Note that as the extensions are not part of the Metaflow package itself, they are not subject to the Metaflow's strict backwards compatibility guarantee and hence their APIs may change over time. This is necessary as the frameworks behind the extensions evolve rather quickly too.
Under the hood: The @parallel
decorator
You can skip this section if you just want to use one of the new extensions. Or read on if you are curious to understand how the extensions work under the hood.
All the new extensions follow this foundational pattern:
In contrast to the Metaflow trusted foreach
pattern, a crucial difference here is that the tasks may communicate with each other whereas tasks in a foreach
are independent and isolated from each other. In this case, one of the tasks is designed as a leader or a control task, coordinating work amongst worker tasks.
In other words, tasks form a tightly interconnected cluster, instead of being executed in an embarrasingly parallel fashion as with foreach
. Crucially, all the tasks forming such a cluster need to execute simultaneously in parallel, to allow them to communicate with each other and the control node.
Metaflow includes a low-level decorator called @parallel
which takes care of setting up a suitable cluster on the fly. Most importantly, the @parallel
decorator takes care of gang-scheduling a set of compute nodes for tasks using the underlying compute layer, for instance leveraging multinode jobs in AWS Batch. Once a cluster is up and running, @parallel
designates one of the nodes as the control node, informing the worker nodes of the address of the control through environment variables.
Currently, the only parameter you need to provide @parallel
is the new num_parallel
option in the self.next
expression that determines the number of nodes in the cluster. The cluster is formed according to the @resources
request in the subsequent step that executed inside the cluster.
All these operations are rather low-level and they are not exposed to the end user directly. Instead, all the extensions use the @parallel
decorator under the hood to create a cluster, on top of which a framework-specific compute cluster can be instantiated.
As a result, the developer can focus on using the frameworks as usual - almost all framework-specific examples you can find online should work as-is with Metaflow - while Metaflow takes care of the low level infrastructrure.
Ray
Ray is an open-source compute framework for scaling AI and Python workloads. It is a popular option for scheduling distributed training jobs in the cloud due its powerful distributed compute primitives, and the easy access Ray’s Python SDKs enable on top of these distributed compute primitives.
Besides distributed training, Ray provides higher-level interfaces like Ray Tune for hyperparameter tuning and Ray Serve for inference. This discussion focuses on training use cases.
The new @metaflow_ray
decorator was contributed by Autodesk that uses Metaflow to power their ML workloads in general. With the new Ray integration, developers can embed their workloads written in Ray natively in their Metaflow workflows without having to worry about setting up Ray clusters separately. Metaflow takes cares of creating ephemeral Ray clusters on the fly.
Getting started
Run pip install metaflow-ray
to get started, and find examples here, many of which have Ray code directly copy-and-pasted from Ray’s extensive documentation. Here is a basic outline of what Metaflow user code looks like when using the @metaflow_ray
decorator, which does these things:
- Forms a Ray cluster of two nodes dynamically at Metaflow task runtime, and
- Runs the code in the Metaflow
@step
function on the cluster.
from metaflow import FlowSpec, step, current, metaflow_ray
class HelloRay(FlowSpec):
def _do_ray_job(self):
import ray
ray.init()
print("Ray initialized in the %s step." % current.step_name)
for k, v in ray.cluster_resources().items():
if "memory" in k.lower():
print("%s: %sGB" % (k, round(int(v) / (1024 * 1024 * 1024), 2)))
else:
print("%s: %s" % (k, v))
@step
def start(self):
self.next(self.my_ray_code, num_parallel=2)
@metaflow_ray
@step
def my_ray_code(self):
self._do_ray_job()
self.next(self.join)
@step
def join(self):
self._do_ray_job()
@step
def end(self):
self._do_ray_job()
if __name__ == "__main__":
HelloRay()
Tensorflow
Tensorflow is the OG of modern deep learning frameworks. Companies have been using Tensorflow with Metaflow for years without special integrations: You can execute Tensorflow code as a part of your @step
s which may access multiple GPUs.
The new @tensorflow
decorator adds support for Tensorflow's native distribution functionality, Tensorflow Distributed, which distributes operations across multiple GPUs, potentially across multiple nodes.
The @tensorflow
decorator allows you to run your custom training loops or keras.Model().fit()
programs on one or multiple workers using any of the built in strategies. When using the decorator, you don't need to worry about setting up the cluster and the TF_CONFIG
environment variable on each worker, as the decorator does this for you.
Getting started
Run pip install metaflow-tensorflow
to get started, and find getting started examples here. Here is a basic outline of what Metaflow user code looks like when using the @tensorflow
decorator:
from metaflow import FlowSpec, step, batch, tensorflow
class MultiNodeTensorFlow(FlowSpec):
@step
def start(self):
self.next(self.train, num_parallel=2)
@batch(gpu=2, image="tensorflow/tensorflow:latest-gpu")
@tensorflow
@step
def train(self):
import tensorflow as tf
strategy = tf.distribute.MultiWorkerMirroredStrategy()
with strategy.scope():
model = build_keras_model()
model.fit(dataset)
self.next(self.join)
@step
def join(self, inputs):
self.next(self.end)
@step
def end(self):
pass
if __name__ == "__main__":
MultiNodeTensorFlow()
PyTorch Distributed and Torchrun
PyTorch is the most popular deep learning framework in 2023. Similar to the Tensorflow integration, the new @torchrun
decorator integrates with Torch's native distribution mechanism, Torch Distributed which supports a variety of patterns for distributed model training.
Torch Distributed provides a convenience layer, torchrun
which makes it easier to launch torch.distributed
scripts on the command line, assuming you have a suitable cluster already set up.
The new @torchrun
decorator takes care of setting up the cluster and calling your PyTorch script, which can be a PyTorch lightning script or regular PyTorch code. Crucially, you don't need to change anything in your Torch code to benefit from the new decorator.
When you annotate a Metaflow step with @torchrun
, you expose a new call on the Metaflow current
object. Specifically, you can copy-paste the torchrun
command you use into this form inside your Metaflow step:
current.torch.run(
entrypoint= “your-script.py”,
entrypoint_args={“param1”: “1”, “param2”: “2”, …},
nproc_per_node=1
)
Getting started
Run pip install metaflow-torchrun
to get started, and find examples here, many of which have PyTorch code directly copy-and-pasted from the Torch distributed documentation. Here is a basic outline of what Metaflow user code looks like when using the @torchrun
decorator:
from metaflow import FlowSpec, step, torchrun, current, batch
class HelloTorchrun(FlowSpec):
@step
def start(self):
self.next(self.torch_multinode, num_parallel=2)
@batch(image="pytorch/pytorch:latest", cpu=2)
@torchrun
@step
def torch_multinode(self):
current.torch.run(entrypoint="my-torch-script.py")
self.next(self.join)
@step
def join(self, inputs):
self.next(self.end)
@step
def end(self):
pass
if __name__ == "__main__":
HelloTorchrun()
Note that my-torch-script.py
gets packaged by Metaflow automatically for remote execution as long as it exists in the same directory as your flow code.
Deepspeed
Deepspeed is a fast-emerging framework for distributed training, originally created at Microsoft. It is built for the largest of use cases, such as training a 180B parameter model.
Working with Deepspeed is primarily about learning how to set the Deepspeed config, based on many factors such as: model size, the compute power in your cluster, how much memory each GPU has, whether you are using quantization, etc. One of the fundamental innovations of Deepspeed revolves is the Zero Redundancy Optimizer (ZeRO) which has numerous parameters, all centered on offloading some parts of a large model’s state from GPU memory to CPU memory.
As of today, these parameters are exposed to you directly but it is possible that we will extend the extension to include reasonable defaults over time.
Like in the @torchrun
above, when you annotate a Metaflow step with @deepspeed
, you expose a new call on the current
object. You can copy and paste the Deepspeed command you use into this form inside your Metaflow step:
current.deepspeed.run(
entrypoint="train.py",
entrypoint_args=["--checkpoint_dir", "experiment_deepspeed"]
)
Getting started
Run pip install metaflow-deepspeed
to get started, and find examples here, including how to train BERT, or run our fork of Databricks Dolly fine-tuning repo on multiple nodes. Here is a basic outline of what Metaflow user code looks like when using the @deepspeed
decorator:
from metaflow import FlowSpec, step, deepspeed, current, batch
class HelloDeepspeed(FlowSpec):
@step
def start(self):
self.next(self.train, num_parallel=4)
@batch(gpu=1, memory=24000, cpu=8)
@deepspeed
@step
def train(self):
current.deepspeed.run(entrypoint="hi-deepspeed.py")
self.next(self.join)
@step
def join(self, inputs):
self.next(self.end)
@step
def end(self):
pass
if __name__ == "__main__":
HelloDeepspeed()
Note that hi-deepspeed.py
gets packaged by Metaflow automatically for remote execution as long as it exists in the same directory as your flow code.
MPI
If Tensorflow is the OG of modern deep learning frameworks, Message Passing Interface or MPI could be characterized as the OG of modern High-Performance Computing (HPC). It was created more than three decades ago and it has been powering compute-intensive workloads on supercomputers since.
MPI by itself is a low-level standard with multiple implementations, notably including open-source OpenMPI which is used by custom programs to pass messages between compute tasks in a low-latency, high-throughput fashion.
Historically, MPI programs are written in C/C++ or Fortran, or a language that can interface with these lower level languages like Python with its mpi4py
package.
In order to pass messages between nodes, users need to be logged into the head (control) node of an MPI cluster, which needs certain communication channels between the nodes open. Metaflow's new @mpi
decorator opens these communication channels for you, so you just need to worry about the MPI code.
Annotating steps with this decorator also adds four new methods to the current object you can run:
current.mpi.exec
: arguments of this function match thempiexec
command structurecurrent.mpi.run
: arguments of this function match thempirun
command structurecurrent.mpi.cc
: arguments of this function match thempicc
command structurecurrent.mpi.broadcast_file
: this function takes one argument, a file existing on the control node, to be distributed to all other nodes. Use this to pass a binary compiled on the control node to all workers.
Getting started
Run pip install metaflow-mpi
to get started, and find examples here, including how to run an mpi4py program - as well as complete routines for compiling, broadcasting, and running your MPI C programs inside Metaflow tasks. Here is a basic outline of what Metaflow user code looks like when using the @mpi
decorator:
# flow.py
from metaflow import FlowSpec, step, batch, mpi, current
class MPI4PyFlow(FlowSpec):
@step
def start(self):
self.next(self.multinode, num_parallel=4)
@batch(cpu=4, memory=16000)
@mpi
@step
def multinode(self):
current.mpi.exec(
args=["-n", “16”],
program="python my_mpi4py_script.py",
)
self.next(self.join)
@step
def join(self, inputs):
self.next(self.end)
@step
def end(self):
pass
if __name__ == "__main__":
MPI4PyFlow()
Summary
All the extensions follow the same common pattern: They allow you to setup ephemeral compute clusters on the fly, as a part of a larger workflow. You can then use your favorite frameworks to build larger models, fine-tune more efficiently, and handle more data.
Compute of this nature used to require highly specialized software stacks and dedicated engineers - a far cry from providing straightforward access to compute for most users. Furthermore, the setups tended to be finicky enough, that they warranted special treatment outside of usual production systems.
We believe that compute - including distribute compute and model training - is a crucial part of the full stack for ML and AI which needs to integrate seamlessly with the rest of the stack. With the new decorators, you can bridge sophisticated model training and other demanding compute steps into larger ML/AI systems, benefiting from all the functionality of Metaflow from fast data access, visualization, tracking, and robust production orchestration.
If you want to start using the new extensions, we would love to hear from you! Join thousands of other AI, ML, and data developers in the Metaflow Community Slack and ask anything on #ask-metaflow
!
Start building today
Join our office hours for a live demo! Whether you're curious about Outerbounds or have specific questions - nothing is off limits.