Skip to main content

Load Parquet Data from S3 to Arrow Table

Question

I have a Parquet dataset stored in AWS S3 and want to access it in a Metaflow flow. How can I read one or several Parquet files at once from a flow and use them in an Arrow table?

Solution

You can access a Parquet dataset on S3 in a Metaflow flow using the metaflow.S3 functionalities. This feature allows you to download and upload large datasets with high throughput from S3.

1Access Parquet Data in S3

It is recommended to use metaflow.S3 in a context manager. It is important to know that metaflow.S3 saves temporary files for the duration of the context. This is why in the following example you will see file names rewritten for access after the scope closes.

To access one file you can use metaflow.S3.get. Often times Parquet datasets have many files which is a good use case for Metaflow's s3.get_many function.

This example uses Parquet data stored in S3 from Ookla Global's AWS Open Data Submission.

2Run Flow

This flow shows how to:

  • Download multiple Parquet files using Metaflow's s3.get_many function.
  • Read the result of the first dataset chunk as a PyArrow table.
load_parquet_to_arrow.py
from metaflow import FlowSpec, step, S3

BASE_URL = 's3://ookla-open-data/' + \
'parquet/performance/type=fixed/'
YEARS = ['2019', '2020', '2021', '2022']
S3_PATHS = [
f'year={y}/quarter=1/{y}-' + \
'01-01_performance_fixed_tiles.parquet'
for y in YEARS
]

class ParquetArrowFlow(FlowSpec):

@step
def start(self):
self.next(self.load_parquet)

@step
def load_parquet(self):
import pyarrow.parquet as pq
with S3(s3root=BASE_URL) as s3:
tmp_data_path = s3.get_many(S3_PATHS)
first_path = tmp_data_path[0].path
self.table = pq.read_table(first_path)
self.next(self.end)

@step
def end(self):
print('Table for first year' + \
f'has shape {self.table.shape}.')

if __name__ == '__main__':
ParquetArrowFlow()
python load_parquet_to_arrow.py run
    ...
[637/end/3308 (pid 7081)] Task is starting.
[637/end/3308 (pid 7081)] Table for first yearhas shape (4877036, 7).
[637/end/3308 (pid 7081)] Task finished successfully.
...

3Access Artifacts Outside of Flow

The following can be run in any script or notebook to access the contents of the table that was stored as a flow artifact with self.table. You can also run quick tests to assert the artifacts have expected properties.

from metaflow import Flow
run = Flow('ParquetArrowFlow').latest_run
table = run.data.table
assert run.successful
assert table.shape == (4877036, 7)
table.select([1,2,3,4,5])
    pyarrow.Table
tile: string
avg_d_kbps: int64
avg_u_kbps: int64
avg_lat_ms: int64
tests: int64

Further Reading