Skip to main content

Run SQL Query with AWS Athena

Question

How can I access data in S3 with a SQL query from my Metaflow flow?

Solution

You can run queries in any way you normally interact with AWS from your Python script in a Metaflow task. In addition to storing data in AWS S3, some helpful tools for doing this include AWS Glue and AWS Athena. AWS Glue is a managed extract, transform, and load (ETL) service. AWS Athena is a serverless SQL service that allows to you to run queries against Glue databases.

1Add Parquet files to AWS Glue DB

create_glue_db.py
import pandas as pd
import awswrangler as wr

def create_db(database_name, bucket_uri, table_name):
dataset = pd.DataFrame({
"id": [1, 2],
"feature_1": ["foo", "bar"],
"feature_2": ["fizz", "buzz"]}
)

try:
# create AWS Glue database query S3 data
wr.catalog.create_database(name=database_name)
except wr.exceptions.AlreadyExists as error:
# if database exists, ignore this step
print(f"{database_name} exists!")

# store data in AWS Data Lake
# here we use .parquet files
# AWS Glue works with many other data formats
_ = wr.s3.to_parquet(df=dataset,
path=f"{bucket_uri}/dataset/",
dataset=True,
database=database_name,
table=table_name)

2Run Flow

This flow shows how to:

  • Access Parquet data with a SQL query using AWS Athena.
  • Transform a data set.
  • Write a pandas DataFrame to AWS S3 as .parquet files.
sql_query_athena.py
from metaflow import FlowSpec, step, Parameter
import awswrangler as wr
from create_glue_db import create_db

class AWSQueryFlow(FlowSpec):

bucket_uri = Parameter(
"bucket_uri",
default="s3://outerbounds-how-tos"
)
db_name = Parameter("database_name",
default="test_db")
table_name = Parameter("table_name",
default="test_table")

@step
def start(self):
create_db(self.db_name, self.bucket_uri,
self.table_name)
self.next(self.query)

@step
def query(self):
QUERY = f"SELECT * FROM {self.table_name}"
result = wr.athena.read_sql_query(
QUERY,
database=self.db_name
)
self.dataset = result
self.next(self.transform)

@step
def transform(self):
concat = lambda x: x["feat_1"] + x["feat_2"]
self.dataset["feat_12"] = self.dataset.apply(
concat,
axis=1
)
self.next(self.write)

@step
def write(self):
path = f"{self.bucket_uri}/dataset/"
_ = wr.s3.to_parquet(df=self.dataset,
mode="overwrite",
path=path,
dataset=True,
database=self.db_name,
table=self.table_name)
self.next(self.end)

@step
def end(self):
print("Database is updated!")

if __name__ == '__main__':
AWSQueryFlow()
python sql_query_athena.py run
    ...
[106/extract/481 (pid 8240)] Task is starting.
[106/extract/481 (pid 8240)] Task finished successfully.
...
[106/transform/482 (pid 8244)] Task is starting.
[106/transform/482 (pid 8244)] Task finished successfully.
...
[106/write/483 (pid 8249)] Task is starting.
[106/write/483 (pid 8249)] Task finished successfully.
...
[106/end/484 (pid 8253)] Task is starting.
[106/end/484 (pid 8253)] Database is updated!
[106/end/484 (pid 8253)] Task finished successfully.
...

3Access Artifacts Outside of Flow

The following can be run in any script or notebook to access the contents of the DataFrame that was stored as a flow artifact with self.dataset.

from metaflow import Flow
run_data = Flow('AWSQueryFlow').latest_run.data
run_data.dataset
id feature_1 feature_2 feature_12
0 1 foo fizz foofizz
1 2 bar buzz barbuzz

Further Reading