Load CSV Data in Metaflow Steps
Question
I have a CSV and want to access it in a Metaflow flow. How can I read this data into tasks and write it to disk?
Solution
You can read a CSV into the flow using IncludeFile
. This makes the data accessible for local and remote steps in the flow.
1Acquire CSV
In this example a CSV from the Metaflow tutorials is used. This is done in the save_data_locally
function defined outside of the flow.
2Run Flow
This flow shows how to:
- Include a CSV saved locally for all steps in the flow.
- Add a feature to each data point.
- Save the new data as a flow artifact.
load_csv_data.py
from metaflow import FlowSpec, step, IncludeFile
import pandas as pd
def save_data_locally():
url = "https://raw.githubusercontent.com/" + \
"Netflix/metaflow/master/metaflow"
data_path = "/tutorials/02-statistics/movies.csv"
local_path = "./movies.csv"
df = pd.read_csv(url+data_path)
df.to_csv(local_path)
class CSVFlow(FlowSpec):
data = IncludeFile("data", default="./movies.csv")
@step
def start(self):
self.next(self.use_csv)
@step
def use_csv(self):
import pandas as pd
from io import StringIO
df = pd.read_csv(StringIO(self.data),
index_col=0)
f = lambda x: x < 2000
df["is_before_2000"] = df["title_year"].apply(f)
self.new_df = df
self.next(self.end)
@step
def end(self):
result = self.new_df.is_before_2000.sum()
print(f"Number of pre-2000 movies is {result}.")
if __name__ == "__main__":
save_data_locally()
CSVFlow()
python load_csv_data.py run
3Access Artifacts Outside of Flow
The following can be run in any script or notebook to access the contents of the DataFrame that was stored as a flow artifact with self.new_df
.
from metaflow import Flow
run = Flow("CSVFlow").latest_run
assert run.successful
run.data.new_df.is_before_2000.sum()
Further Reading
- Basics of Metaflow
- Working with cloud data using Metaflow