Skip to main content

Chunk a Dataframe to Parquet

Question

I have a large pandas dataframe in memory. How can I chunk it into Parquet files using Metaflow?

Solution

You can do this using Metaflow's foreach. A pattern you can use to do this is to use Apache Arrow's zero copy slice ability with Metaflow's foreach.

1Gather data

Suppose you have curated a dataset:

import numpy as np
import pandas as pd
import string
from datetime import datetime

letters = list(string.ascii_lowercase)
make_str = lambda n: ''.join(np.random.choice(letters, size=n))
dates = pd.date_range(start=datetime(2010,1,1),
end=datetime.today(),
freq="min")
size = len(dates)
df = pd.DataFrame({
'date': dates,
'num1': np.random.rand(size),
'num2': np.random.rand(size),
'str1': [make_str(20) for _ in range(size)],
'str2': [make_str(20) for _ in range(size)]
})

df.to_csv("./large_dataframe.csv")
df.head(3)
date num1 num2 str1 str2
0 2010-01-01 00:00:00 0.424410 0.503014 xyouzjaivrwtnqczcieb fonxhwjxdpdvnfvtvcar
1 2010-01-01 00:01:00 0.650159 0.184204 dxrqtbmezgwobpqlpybt ihahasnbtgptjfwnvlic
2 2010-01-01 00:02:00 0.602216 0.647338 kaatnygdfekoxmpnvbky wffzxlyzjnopahttvdxe

and your goal is to store this data efficiently in Parquet files.

2Determine How to Chunk the Data

Pyarrow version 5.0.0 is used to split the dataframe into chunks. You can see how in this utility function that will be used in the following flow:

dataframe_utils.py
import pyarrow as pa
import pandas as pd
from datetime import datetime
from typing import List, Tuple

def get_chunks(df:pd.DataFrame = None,
num_chunks:int = 4) -> Tuple[pa.Table, List]:
get_year = lambda x: datetime.strptime(
x.split()[0], "%Y-%m-%d").year
df['year'] = df.date.apply(get_year)
num_records = df.shape[0] // num_chunks
lengths = [num_records] * num_chunks
lengths[-1] += df.shape[0] - num_chunks*num_records
offsets = [sum(lengths[:i]) for i in range(num_chunks)]
names = ["chunk_%s" %i for i in range(num_chunks)]
return (pa.Table.from_pandas(df),
list(zip(names, offsets, lengths)))

3Run Flow

This flow shows how to load this into a pandas dataframe and apply the following steps:

  • Use pyarrow.from_pandas method to load the data to Arrow memory.
  • In parallel branches:
    • Use pyarrow.Table.slice to make zero-copy views of chunks of the table.
    • Apply a transformation to the table; appending a column in this case
    • Move the chunks to your S3 bucket using pyarrow.parquet.write_table.
  • Pick a chunk and verify the existence of the new transformed column.

If you have a dataframe in S3 that you want to read into memory, you can see an example specific to this topic here.

chunk_dataframe.py
from metaflow import FlowSpec, step

class ForEachChunkFlow(FlowSpec):

bucket = "s3://outerbounds-how-tos"
s3_path = "{}/dataframe-chunks/{}.parquet"
df_path = "./large_dataframe.csv"

@step
def start(self):
import pandas as pd
from dataframe_utils import get_chunks
my_big_df = pd.read_csv(self.df_path)
self.table, self.chunks = get_chunks(my_big_df)
self.next(self.process_chunk, foreach='chunks')

@step
def process_chunk(self):
import pyarrow as pa
import pyarrow.parquet as pq

# get view of this chunk only
chunk_id, offset, length = self.input
chunk = self.table.slice(offset=offset, length=length)

# do transformation on table
col1 = chunk['num1'].to_numpy()
col2 = chunk['num2'].to_numpy()
values = pa.array(col1 * col2)
chunk = chunk.append_column('new col', values)

# write chunk as parquet file in S3 bucket
self.my_path = self.s3_path.format(self.bucket, chunk_id)
pq.write_table(table=chunk, where=self.my_path)
self.next(self.join)

@step
def join(self, inputs):
self.next(self.end)

@step
def end(self):
import pyarrow.parquet as pq
test_id = 'chunk_1'
path = self.s3_path.format(self.bucket, test_id)
test_chunk = pq.read_table(source=path)
assert 'new col' in test_chunk.column_names

if __name__ == "__main__":
ForEachChunkFlow()
python chunk_dataframe.py run
     Workflow starting (run-id 1658839758360594):
[1658839758360594/start/1 (pid 65431)] Task is starting.
[1658839758360594/start/1 (pid 65431)] Foreach yields 4 child steps.
[1658839758360594/start/1 (pid 65431)] Task finished successfully.
[1658839758360594/process_chunk/2 (pid 65447)] Task is starting.
[1658839758360594/process_chunk/3 (pid 65448)] Task is starting.
[1658839758360594/process_chunk/4 (pid 65449)] Task is starting.
[1658839758360594/process_chunk/5 (pid 65450)] Task is starting.
[1658839758360594/process_chunk/5 (pid 65450)] Task finished successfully.
[1658839758360594/process_chunk/4 (pid 65449)] Task finished successfully.
[1658839758360594/process_chunk/2 (pid 65447)] Task finished successfully.
[1658839758360594/process_chunk/3 (pid 65448)] Task finished successfully.
[1658839758360594/join/6 (pid 65592)] Task is starting.
[1658839758360594/join/6 (pid 65592)] Task finished successfully.
[1658839758360594/end/7 (pid 65595)] Task is starting.
[1658839758360594/end/7 (pid 65595)] Task finished successfully.
Done!

Further Reading