Skip to main content

Filter Flows on Condition

Question

How can I filter the runs of my flows based on some condition?

Solution

You can use the Metaflow Client API to access the results of your flows.

1Run Flow

This flow and the subsequent client API calls show how to:

  • Define a flow that has a conditional value saved as a flow artifact.
  • Filter runs of this flow by successful runs.
  • Filter runs of this flow by time.
  • Filter runs of this flow by a flow artifact value.
filter_on_condition.py
from metaflow import FlowSpec, step

class ConditionalFilterFlow(FlowSpec):

@step
def start(self):
import random
self.fancy_conditional = random.choice([1,2,3])
print(self.fancy_conditional)
self.next(self.end)

@step
def end(self):
pass

if __name__ == "__main__":
ConditionalFilterFlow()
python filter_on_condition.py run
     Workflow starting (run-id 1659645557151571):
[1659645557151571/start/1 (pid 20584)] Task is starting.
[1659645557151571/start/1 (pid 20584)] 3
[1659645557151571/start/1 (pid 20584)] Task finished successfully.
[1659645557151571/end/2 (pid 20587)] Task is starting.
[1659645557151571/end/2 (pid 20587)] Task finished successfully.
Done!

2Filter by Successful Runs

Here is an example using the Client API to access the results of the run you just did. Note that Flow(flow_name) will return a generator that can be iterated over. You may also want to cut off the amount of runs that are fetched.

from metaflow import Flow
flow_name = 'ConditionalFilterFlow'
max_runs = 100 # limit query in case of many flow runs
successful_runs = []
for i, run in enumerate(Flow(flow_name)):
if run.successful:
successful_runs.append(run)
if i >= max_runs:
break
successful_runs
    [Run('ConditionalFilterFlow/1659645557151571'),
Run('ConditionalFilterFlow/1659645118452480'),
Run('ConditionalFilterFlow/1658853090048097'),
Run('ConditionalFilterFlow/1658852893601432'),
Run('ConditionalFilterFlow/1658840520845011'),
Run('ConditionalFilterFlow/1658731222406680'),
Run('ConditionalFilterFlow/1658729896148442'),
Run('ConditionalFilterFlow/1658729755179231'),
Run('ConditionalFilterFlow/1658729474247844'),
Run('ConditionalFilterFlow/1658729270057885'),
Run('ConditionalFilterFlow/1658728960003690')]

3Filter by Time

You can use properties of the Metaflow Run to filter. This snippet shows how to use the datetime library to filter runs from January 1st of this year to today:

from metaflow import Flow
from datetime import datetime
now = datetime.now()
start_year, today = datetime(now.year, 1, 1), datetime.now()
runs_this_year = []
max_runs = 100
for i, run in enumerate(Flow(flow_name)):
if run.created_at > start_year and run.created_at < today:
runs_this_year.append(run)
if i >= max_runs:
break
runs_this_year
    [Run('ConditionalFilterFlow/1659645557151571'),
Run('ConditionalFilterFlow/1659645118452480'),
Run('ConditionalFilterFlow/1658853090048097'),
Run('ConditionalFilterFlow/1658852893601432'),
Run('ConditionalFilterFlow/1658840520845011'),
Run('ConditionalFilterFlow/1658731222406680'),
Run('ConditionalFilterFlow/1658729896148442'),
Run('ConditionalFilterFlow/1658729755179231'),
Run('ConditionalFilterFlow/1658729474247844'),
Run('ConditionalFilterFlow/1658729270057885'),
Run('ConditionalFilterFlow/1658728960003690')]

4Filter by Flow Artifact

This snippet shows how to use the datetime library to filter runs from January 1st of this year to today.

from metaflow import Flow
runs_with_fancy_condition = []
num_runs_without_fancy_condition = 0
filter_value = 1
key_errors = 0
max_runs = 100
for i, run in enumerate(Flow(flow_name)):
try:
if run['start'].task.data.fancy_conditional == filter_value:
runs_with_fancy_condition.append(run)
else:
num_runs_without_fancy_condition += 1
except KeyError:
key_errors += 1
print("This flow has {} runs that have different fancy_conditional values != 1".format(
num_runs_without_fancy_condition))
print("This flow has {} runs that do not have the fancy_conditional parameter".format(
key_errors))
    This flow has 5 runs that have different fancy_conditional values != 1
This flow has 4 runs that do not have the fancy_conditional parameter
for run in runs_with_fancy_condition:
assert run.data.fancy_conditional == filter_value

Further Reading