Skip to main content

Schedule Flows on Argo Workflows

Question

How can I schedule flows to run at a specific time on Argo?

Solution

There is a Metaflow decorator for that!

Outerbounds user note

This guide assumes you have a functional Metaflow deployment with Argo enabled. Outerbounds users already have a managed Argo service running and can follow this guide right away. Self-hosted Metaflow users need their Metaflow admin to first deploy Argo in the Kubernetes cluster created when they deployed Metaflow.

1Scheduling Flows

You can use Metaflow's @schedule flow-level decorator to run on Argo Workflows, where Metaflow automatically maps a FlowSpec to an Argo workflow template, and a run of the flow to an Argo Workflow.

After deploying the script containing your flow to Argo workflows the execution of a FlowSpec can happen on any event-based trigger you setup or a time-based trigger defined with Metaflow's @schedule decorator.

2Run Flow

This flow is scheduled to run daily. Notice Metaflow's @schedule decorator has arguments that determine when the flow is run. Time based triggers you can use include:

  • @schedule(weekly=True) runs the workflow on Sundays at midnight.
  • @schedule(daily=True) runs the workflow every day at midnight.
  • @schedule(hourly=True) runs the workflow every hour.
  • @schedule(cron='0 10 * * ? *') runs the workflow at the given Cron schedule, in this case at 10am UTC every day. You can use the rules defined here to define the schedule for the cron option.
schedule_flow_argo.py
from metaflow import FlowSpec, schedule, step
from datetime import datetime

@schedule(daily=True)
class DailyFlowArgo(FlowSpec):

@step
def start(self):
now = datetime.now().strftime('%Y-%m-%d %H:%M:%S')
print('time is %s' % now)
self.next(self.end)

@step
def end(self):
pass

if __name__ == '__main__':
DailyFlowArgo()
python schedule_flow_argo.py --with retry argo-workflows create
    Deploying dailyflowargo to Argo Workflows...
It seems this is the first time you are deploying dailyflowargo to Argo Workflows.

A new production token generated.

The namespace of this production flow is
production:dailyflowargo-0-hoxy
To analyze results of this production flow add this line in your notebooks:
namespace("production:dailyflowargo-0-hoxy")
If you want to authorize other people to deploy new versions of this flow to Argo Workflows, they need to call
argo-workflows create --authorize dailyflowargo-0-hoxy
when deploying this flow to Argo Workflows for the first time.
See "Organizing Results" at https://docs.metaflow.org/ for more information about production tokens.

Workflow dailyflowargo for flow DailyFlowArgo pushed to Argo Workflows successfully.

Note that the flow was deployed with a modified name due to Kubernetes naming conventions
on Argo Workflows. The original flow name is stored in the workflow annotation.

What will trigger execution of the workflow:
This workflow triggers automatically via the CronWorkflow dailyflowargo.

After running the above command your flow will be triggered daily!

3Manually Trigger Flow

You can manually trigger the flow at any time:

python schedule_flow_argo.py argo-workflows trigger
    Workflow dailyflowargo triggered on Argo Workflows (run-id argo-dailyflowargo-6gwjp).
See the run in the UI at https://ui.dev-content.outerbounds.xyz/p/default/DailyFlowArgo/argo-dailyflowargo-6gwjp
Outerbounds user note

On Outerbounds, you can view your Argo Workflows by clicking the "Deployments" tab in the navigation. Self-hosted Metaflow users can visualize and monitor the flow through the standard Argo UI.

Further Reading