Skip to main content

Run SQL Query with Pandas

Question

How do I query a database with SQL and load the results into Pandas?

Solution

Pandas has utility functions that make it one line to create a table and store it in a database, and later run queries against the data. This page will show you how to run a SQL query against a self-hosted database.

1Add Table to MySQL DB

To run the full example locally you will need to install MySQL and set up a database called test. This example uses a Python function defined in the script containing the flow to create the table but you could set up the table any way you prefer to interact with the database.

2Run Flow

The flow shows how to:

  • Access data in a Pandas dataframe by running a SQL query on a local database.
    • In this example a MySQL database is used, but you could also store data in PostgreSQL for this example.
  • Make a transformation to the dataframe.
  • Save the result to a separate table in the database.
sql_query_local.py
from metaflow import FlowSpec, step, Parameter
from sqlalchemy import create_engine
import pandas as pd

class LocalQueryFlow(FlowSpec):

@step
def start(self):
self.next(self.extract)

@step
def extract(self):
QUERY = f"SELECT * FROM {table_name}"
self.result = pd.read_sql(QUERY, con=conn)
self.next(self.transform)

@step
def transform(self):
f = lambda x: x["feat_1"] + x["feat_2"]
self.result["feat_12"] = self.result.apply(f,
axis=1)
self.next(self.write)

@step
def write(self):
self.result.to_sql(name=f"{table_name}_updated",
con=conn,
if_exists="replace")
self.next(self.end)

@step
def end(self):
conn.close()

### local database configuration ###
db_path = 'mysql://root:pass@localhost/data'
table_name = 'data'
engine = create_engine(db_path, echo=False)
conn = engine.connect()

def create_table(db_path, table, conn):
# create dataset
dataset = pd.DataFrame({"id": [1, 2],
"feat_1": ["foo", "bar"],
"feat_2": ["fizz", "buzz"]})
try: # write contents to local db
dataset.to_sql(table, con=conn)
except ValueError:
print(f"{table} at {db_path} doesn't exist.")

if __name__ == "__main__":
create_table(db_path, table_name, conn)
LocalQueryFlow()
python sql_query_local.py run
     Workflow starting (run-id 610):
[610/start/3177 (pid 73064)] Task is starting.
[610/start/3177 (pid 73064)] Task finished successfully.
[610/extract/3178 (pid 73069)] Task is starting.
[610/extract/3178 (pid 73069)] Task finished successfully.
[610/transform/3179 (pid 73073)] Task is starting.
[610/transform/3179 (pid 73073)] Task finished successfully.
[610/write/3180 (pid 73082)] Task is starting.
[610/write/3180 (pid 73082)] Task finished successfully.
[610/end/3181 (pid 73086)] Task is starting.
[610/end/3181 (pid 73086)] Task finished successfully.
Done!

3Access Artifacts Outside of Flow

The following can be run in any script or notebook to access the contents of the DataFrame that was stored as a flow artifact with self.result.

from metaflow import Flow
run = Flow('LocalQueryFlow').latest_run
run.data.result
index id feat_1 feat_2 feat_12
0 0 1 foo fizz foofizz
1 1 2 bar buzz barbuzz

Further Reading