Run SQL Query with Pandas
Question
How do I query a database with SQL and load the results into Pandas?
Solution
Pandas has utility functions that make it one line to create a table and store it in a database, and later run queries against the data. This page will show you how to run a SQL query against a self-hosted database.
1Add Table to MySQL DB
To run the full example locally you will need to install MySQL and set up a database called test
. This example uses a Python function defined in the script containing the flow to create the table but you could set up the table any way you prefer to interact with the database.
2Run Flow
The flow shows how to:
- Access data in a Pandas dataframe by running a SQL query on a local database.
- In this example a MySQL database is used, but you could also store data in PostgreSQL for this example.
- Make a transformation to the dataframe.
- Save the result to a separate table in the database.
sql_query_local.py
from metaflow import FlowSpec, step, Parameter
from sqlalchemy import create_engine
import pandas as pd
class LocalQueryFlow(FlowSpec):
@step
def start(self):
self.next(self.extract)
@step
def extract(self):
QUERY = f"SELECT * FROM {table_name}"
self.result = pd.read_sql(QUERY, con=conn)
self.next(self.transform)
@step
def transform(self):
f = lambda x: x["feat_1"] + x["feat_2"]
self.result["feat_12"] = self.result.apply(f,
axis=1)
self.next(self.write)
@step
def write(self):
self.result.to_sql(name=f"{table_name}_updated",
con=conn,
if_exists="replace")
self.next(self.end)
@step
def end(self):
conn.close()
### local database configuration ###
db_path = 'mysql://root:pass@localhost/data'
table_name = 'data'
engine = create_engine(db_path, echo=False)
conn = engine.connect()
def create_table(db_path, table, conn):
# create dataset
dataset = pd.DataFrame({"id": [1, 2],
"feat_1": ["foo", "bar"],
"feat_2": ["fizz", "buzz"]})
try: # write contents to local db
dataset.to_sql(table, con=conn)
except ValueError:
print(f"{table} at {db_path} doesn't exist.")
if __name__ == "__main__":
create_table(db_path, table_name, conn)
LocalQueryFlow()
python sql_query_local.py run
3Access Artifacts Outside of Flow
The following can be run in any script or notebook to access the contents of the DataFrame that was stored as a flow artifact with self.result
.
from metaflow import Flow
run = Flow('LocalQueryFlow').latest_run
run.data.result
index | id | feat_1 | feat_2 | feat_12 | |
---|---|---|---|---|---|
0 | 0 | 1 | foo | fizz | foofizz |
1 | 1 | 2 | bar | buzz | barbuzz |
Further Reading
- Loading and storing data with Metaflow