Skip to main content

Using Airflow with Metaflow

You can deploy workflows developed with Metaflow on Apache Airflow, a popular open-source workflow orchestrator.

For background why the integration exists in the first place, see the Metaflow-Airflow announcement post. For user-facing documentation, see Scheduling Metaflow Flows with Apache Airflow. This section dives deeper in technical details of the integration, which may be useful for engineers who operate Airflow and Metaflow.

How to deploy Airflow for Metaflow?

You can use your existing Airflow server, assuming it is configured to work with Kubernetes. To deploy a new Airflow server from scratch, follow our cloud-specific deployment guides.

You can also use the integration with many managed Airflow providers, such as Amazon MWAA which has been confirmed to work with this integration.

How does the integration work?

Metaflow flow can be compiled into a native Airflow DAG simply by executing

python myflow.py airflow create myflow_airflow_dag.py

Behind the scenes, this command packages the user code in the current working directory and stores them in the chosen datastore in S3, Azure Blob store, or Google Cloud Storage. It then compiles the code into an Airflow DAG, stored in the specified output file. The generated DAG reflects the currently active Metaflow configuration, which you can change with the METAFLOW_PROFILE environment variable.

To schedule the DAG on Airflow, you need to move the generated file, like myflow_airflow_dag.py above, to the Airflow scheduler's dags folder. Once placed in the folder, the scheduler will run the workflow as any other native Airflow workflow.

Airflow DAGs created by Metaflow run tasks as Kubernetes pods using Airflow's KubernetesPodOperator.

Command-line options

Besides the usual Metaflow-specific options, the airflow create command includes some Airflow-specific options:

  • --is-paused-upon-creation - Generated DAG is paused by default.

  • --worker-pool - Sets Airflow worker pool for the DAG.

Configuring Metaflow for Airflow

As all Metaflow tasks orchestrated by Airflow run on Kubernetes using KubernetesPodOperator, Metaflow needs to be aware of Airflow's Kubernetes setup at the time when airflow create is executed.

There are three alternative ways to define the Airflow's Kubernetes configuration in Metaflow's configuration file or as an environment variable.

  1. METAFLOW_AIRFLOW_KUBERNETES_KUBECONFIG_CONTEXT - sets the K8s context specified in ~/.kube/config to use.

  2. METAFLOW_AIRFLOW_KUBERNETES_KUBECONFIG_FILE - sets the path of the Kube config on the Airflow installation. Airflow tasks will use this configuration file.

  3. METAFLOW_AIRFLOW_KUBERNETES_CONN_ID - sets Metaflow to use an existing Kubernetes cluster connection in Airflow. You need to configure the connection in the Airflow UI or the configuration file.

For instance, you can deploy a flow as follows:

export METAFLOW_AIRFLOW_KUBERNETES_KUBECONFIG_CONTEXT=arn:aws:eks:us-west-2:00612345:cluster/mycluster-j345e 
python foreach.py airflow create foreachdag.py

FAQ

What is the minimum version of Airflow supported?

At least Airflow 2.2.0 is required to support basic Metaflow decorators and flows with nested branches. Metaflow doesn’t support foreaches with Airflow version 2.2.0.

Airflow introduced dynamic task mapping in version 2.3.0, which is required if you want to use Metaflow's foreaches with Metaflow. Nested foreaches are not supported by Airflow.

In addition, foreach support will require the Airflow installation to have apache-airflow-providers-cncf-kubernetes provider installed at a minimum of version 4.2.0.

What compute layers are supported?

At this time only @kubernetes is supported - not @batch.

What Airflow executors are supported?

Metaflow compiled DAGs can run on any Airflow Executors such as LocalExecutor, KubernetesExecutor, or CeleryKubernetesExecutor.

Can I create multiple DAGs of the same flow?

When Metaflow creates an Airflow DAG file, it sets the name of the flow as dag_id. The name of the flow corresponds to the class name of the class inheriting the FlowSpec class. When these files are moved into the Airflow scheduler ensure that only one file for that dag_id exists.

Use the @project decorator to deploy multiple variants of the same Flow. Setting the @project decorator will ensure that the dag_id is set based on the project name and branch that you specify.

caution

Ensure that only one dag file exists per dag_id. Having multiple DAGs deployed with the same dag_id can lead to anomalous behavior during task execution.

Can I use KubernetesExecutor?

Our Terraform templates deploy Airflow with a LocalExecutor. The deployment scheme recommended by the KubernetesExecutor indicates that each worker pod requires access to DAG files. Our Terraform templates provided for AWS, GCP, and Azure do not set this up.

If you want to use KubernetesExecutor, you can set up your own deployment scheme and ways to manage DAG files as described here.

Can I use Metaflow with Airflow on Minikube?

Yes. Follow these instructions to run Airflow on Minikube.

How to configure K8s permissions for Metaflow-Airflow?

There are two different sets of permissions included:

Airflow components require certain permissions on Kubernetes to launch jobs. If you are deploying Airflow on Kubernetes, ensure that the Airflow worker or scheduler’s service account has the right role bindings to deploy pods in the required Kubernetes namespace. You can find the permissions for the role over here.

If Airflow is deployed using the Helm chart provided by Apache, it will automatically ensure that the scheduler and the workers can launch pods on Kubernetes.

Cloud Storage Access Permissions

Metaflow-related pods on Kubernetes require some permissions to access the cloud datastore (S3, Azure Blob Store, or Google Cloud Storage). On AWS, you can set the METAFLOW_KUBERNETES_SERVICE_ACOUNT configuration variable or @kubernetes(service_account='myserviceaccount') to ensure that the pods get the correct permissions automatically.