Skip to main content

Load CSV Data in Metaflow Steps

Question

I have a CSV and want to access it in a Metaflow flow. How can I read this data into tasks and write it to disk?

Solution

You can read a CSV into the flow using IncludeFile. This makes the data accessible for local and remote steps in the flow.

1Acquire CSV

In this example a CSV from the Metaflow tutorials is used. This is done in the save_data_locally function defined outside of the flow.

2Run Flow

This flow shows how to:

  • Include a CSV saved locally for all steps in the flow.
  • Add a feature to each data point.
  • Save the new data as a flow artifact.
load_csv_data.py
from metaflow import FlowSpec, step, IncludeFile
import pandas as pd

def save_data_locally():
url = "https://raw.githubusercontent.com/" + \
"Netflix/metaflow/master/metaflow"
data_path = "/tutorials/02-statistics/movies.csv"
local_path = "./movies.csv"
df = pd.read_csv(url+data_path)
df.to_csv(local_path)

class CSVFlow(FlowSpec):

data = IncludeFile("data", default="./movies.csv")

@step
def start(self):
self.next(self.use_csv)

@step
def use_csv(self):
import pandas as pd
from io import StringIO
df = pd.read_csv(StringIO(self.data),
index_col=0)
f = lambda x: x < 2000
df["is_before_2000"] = df["title_year"].apply(f)
self.new_df = df
self.next(self.end)

@step
def end(self):
result = self.new_df.is_before_2000.sum()
print(f"Number of pre-2000 movies is {result}.")

if __name__ == "__main__":
save_data_locally()
CSVFlow()
python load_csv_data.py run
    ...
[1654221300950244/end/3 (pid 71595)] Task is starting.
[1654221300950244/end/3 (pid 71595)] Number of pre-2000 movies is 1023.
[1654221300950244/end/3 (pid 71595)] Task finished successfully.
...

3Access Artifacts Outside of Flow

The following can be run in any script or notebook to access the contents of the DataFrame that was stored as a flow artifact with self.new_df.

from metaflow import Flow 
run = Flow("CSVFlow").latest_run
assert run.successful
run.data.new_df.is_before_2000.sum()
    1023

Further Reading