The latest Metaflow release, 2.12, introduces a new API for launching and managing flows, which enables a number of highly requested use cases such as executing flows in notebooks. Read more below and in the updated documentation.
Some features are so complex that they require a thorough explanation. Others are so unassumingly simple and easily overlooked that they deserve extra attention.
The highlight of Metaflow 2.12, a new Runner API for launching flows programmatically, belongs to the latter category. Through a straightforward API, it allows you to execute and manage flows in notebooks, CI/CD pipelines, advanced scripts, and even in asynchronous microservices. Keep on reading to see how.
The new Runner API
Since the first release, users have mainly used Metaflow on the command line. You write a flow as a file, say myflow.py
, and run it on the command line as follows:
python myflow.py run
With Metaflow 2.12, you can do the same in a Python script or a module:
with Runner('myflow.py').run() as running:
print(f'{running.run} completed')
This doesn’t seem that different from the command line, so what’s the point? An immediate benefit is that accessing any data produced by the run becomes straightforward:
with Runner('myflow.py').run() as running:
results = running.run.data
print(f'Model scored {results.model_score}')
The .run
attribute gives you access to Metaflow's Client API so you can inspect any data, logs, tags, or metadata produced by the run. Not only you can access results in Python, but you can pass in parameters as well which get type-checked on the fly:
with Runner('myflow.py').run(db='production', alpha=0.5) as running:
results = running.run.data
print(f'Model scored {results.model_score}')
Should you need more horsepower than what’s available on your local workstation, you can run the flow in the cloud requesting any compute resources, such as an instance with four GPUs:
with Runner("myflow.py", decospecs=["kubernetes:gpu=4"]).run(
db="production", alpha=0.5
) as running:
results = running.run.data
print(f"Model scored {results.model_score}")
It turns out that this new feature, which packs surprising capabilities behind a simple API, opens up a number of use cases that people have been asking for over the years. Let’s take a look at a few key examples.
Running Metaflow in a notebook
One of the most requested features has been the ability to run flows easily in a notebook. This quick video shows how you can use the new NBRunner utility to make it happen:
You can use a notebook as usual, define functions in cells and test them on the fly with small models and data. When you need more compute capacity to handle more data and larger models, you can quickly define a flow in a cell, utilizing the defined functions, and execute the whole package in the cloud requesting any amount of compute resources, including beefy GPUs.
A remarkable aspect of this is what the video doesn’t show: you don’t need to create Docker images or worry about packaging your Python functions, as Metaflow manages dependencies for you and ships the whole execution environment to the cloud. The end result is fully production-ready, benefiting from all production-oriented features of Metaflow, which is usually not the case with notebooks.
Running Metaflow in scripts with custom configurations
Advanced Metaflow projects, such as many of those at Netflix, end up wrapping Metaflow in their own project-specific scripts and configurations to support their specific ways of working. Some projects use advanced configuration management systems, such as Hydra by Meta, to manage parameters of Metaflow deployments and experiments.
The new Runner API and a framework like Hydra go together like peanut butter and jelly. Consider this stand-alone script that loads configuration using Hydra and maps it to the parameters of a (configurable) flow:
import hydra
from omegaconf import DictConfig, OmegaConf
from metaflow import Runner
@hydra.main(version_base=None, config_path="conf", config_name="config")
def run_flow(cfg):
config = OmegaConf.to_container(cfg)
flow = config.pop('flow')
with Runner(flow).run(**config) as running:
print(f'{running.run} completed')
if __name__ == "__main__":
run_flow()
Thanks to Hydra, you can use a single command to execute many experiments with different parameterizations, as shown here:
Running tests in a CI/CD pipeline
A few weeks back, we published an article about continuous delivery of ML/AI projects. A typical CI/CD pipeline includes a test harness, which in the case of ML/AI may involve, say, backtesting of candidate models with historical data. Such ML/AI tests can be computationally expensive to execute, so it makes sense to use Metaflow’s facilities to run them in parallel in the cloud.
Thanks to the new Runner API, you can launch tests in the cloud and evaluate results cleanly in Python without having to resort to messy shell scripts. This simple snippet gives you the idea (an actual test harness would likely be much more elaborate):
from metaflow import Runner
with Runner('model.py', decospecs=['kubernetes']).run() as running:
accuracy = running.run.data.model_accuracy
assert accuracy > 0.9
print(f'✅ Checks passed - model accuracy is {accuracy}')
Here’s the snippet in action on Github Actions:
Running flows asynchronously
The above examples demonstrated the blocking side of the Runner API. If you want to run flows inside (web) services that need to handle multiple requests concurrently, or you want to manage multiple parallel runs e.g. for hyperparamer search, or just control log streaming, the asynchronous Runner API is the answer. It leverages Python’s asynchronous APIs to manage concurrent runs in the background, so you can safely use it with other async frameworks to handle advanced, multi-tasking use cases.
For instance, this example from the documentation manages five parallel runs, launched with async_run
:
import asyncio
from metaflow import Runner
async def main():
# start five concurrent runs
runs = [await Runner('slowflow.py').async_run(seconds=i) for i in range(5)]
while runs:
print(f'{len(runs)} runs are still running')
still_running = []
for running in runs:
if running.status == 'running':
still_running.append(running)
else:
print(f'{running.run} ran for {running.run.data.secs} seconds')
runs = still_running
await asyncio.sleep(1)
asyncio.run(main())
Here’s how the parallel runs look like in the Metaflow UI:
Run() before it’s too late
The new Runner API is available in Metaflow 2.12, so you can simply
pip install metaflow
to get started. For instructions, take a look at the updated documentation.
As always, we’d love to hear your feedback and questions on the #ask-metaflow
channel on the Metaflow Community Slack, so join the community today.
You can get all the above functionality, including notebooks, scalable compute, CI/CD integration, and more as a managed service running securely in your cloud account. It takes only 15 minutes to get started!