Dolt with Popular DataFrame Tools
Dolt is a version-controlled SQL database. For data science (DS) workflows, specifically, Dolt uses data versioning primitives to implement unique flavors of reproducibility.
DataFrames are a common interface for exploring CSV, Parquet and other file types. Combining the ergonomics of DS tools with the performance, scalability and reliability of databases can yield fantastic results, exhibited most-famously by Uber’s Michelangelo platform. At the same time, synchronizing legacy and bleeding-edge data tools is difficult. Today we will load Dolt data into three types of DataFrames to show how near at hand production-ready and reproducible machine learning is with the tools you already use everyday:
0: Installing Dolt
All examples require the dolt binary to be installed separately:
> sudo bash -c 'curl -L https://github.com/dolthub/dolt/releases/latest/download/install.sh | sudo bash'
1: Pandas DataFrames
Pandas integrates SQL natively and can generate its own dataframes given a valid SQL server connection.
A handful of dependencies are required for Pandas-to-MySQL database access:
> pip install pandas sqlalchemy pymysql
Exposing Dolt's SQL interface requires running a server in the background that Pandas can communicate with (refer to the CLI vs SQL interface article for a refresher):
> mkdir -p test_db
> cd test_db
> dolt init
> dolt sql-server -l trace
Starting server with Config HP="localhost:3306"|U="root"|P=""|T="28800000"|R="false"|L="trace"
Pandas can now load keyless tables into Dolt:
localhost:3306
:
In [1]: import pandas as pd
...: from sqlalchemy import create_engine
...: engine = create_engine("mysql+pymysql://root@localhost/test_db", echo=False)
In [2]: with engine.begin() as connection:
...: df = pd.DataFrame({"name" : ["User 4", "User 5"]})
...: df.to_sql("users", con=connection, index=False, if_exists="append")
In [3]: engine.execute("SELECT * FROM users").fetchall()
Out[3]: [(None, 'User 4'), (None, 'User 5')]
Normally we use dolt's command-line-interface to persist commits to version-control, but we can use the SQL sesion to do the same thing:
In [4]: engine.execute("SELECT DOLT_ADD('users')").fetchall()
Out[4]: [(0,)]
In [5]: engine.execute("SELECT DOLT_COMMIT('-m', 'Add users table')").fetchall()
Out[5]: [('03562lom2sobtv21t28o7u49kpa5hg7e',)]
In [6]: with engine.begin() as connection:
...: brances = pd.read_sql("select * from dolt_branches;", con=connection)
...: branches
Out[6]:
name hash latest_committer latest_committer_email latest_commit_date latest_commit_message
0 master 03562lom2sobtv21t28o7u49kpa5hg7e Bojack Horseman bojack@horseman.com 2021-03-20 17:14:57.094 Add users table
2: Dask DataFrames
Dask is a progression of Pandas for special use-cases. Out-of-memory DataFrames facilitate batching and/or distributed computing over large datasets. Dask copies the Pandas DataFrame interface to make the transition experience smooth for Python users.
To get started with Dask, first install its dataframe extension:
> pip install dask[dataframe]
We will use the hospital price transparency dataset for this example, which at ~20 GB is too big for Pandas to load (and therefore takes 15-30 minutes to download):
> dolt clone hospital-price-transparency
Similar to example (1), we expose dolt as an sql-server that Dask understand how to access:
> cd hospital-price-transparency
> dolt sql-server -l trace
Starting server with Config HP="localhost:3306"|U="root"|P=""|T="28800000"|R="false"|L="trace"
Dask's interface should feel familar after example (1).
In [1]: import dask.dataframe as dd
In [2]: conn = "mysql+pymysql://root@localhost/hospital_price_transparency"
In [3]: df = dd.read_sql_table(
table="prices",
uri=conn,
index_col="code",
divisions=list('acegikmoqsuwz'))
In [4]: df.head(npartitions=5)
Out[4]:
npi_number payer price code__1
code
c9132 1932148947 Anthem Commercial Out of Network IP RATE 3407.10 c9132
c9132 1932148947 Aetna HMO/PPO OP RATE 3116.25 c9132
c9132 1932148947 Alignment Medicare Adv_ HMO / PPO OP RATE 2.09 c9132
c9132 1932148947 Anthem Commercial Out of Network OP RATE 3656.40 c9132
c9132 1932148947 Anthem Medi-Cal OP RATE 10.83 c9132
Dask executed the query above with a where-clause partition, loading a
subset of data to avoid memory bottlenecks. Dolt's trace-logging reveals
a bit about how that process works:
(logged from the dolt sql-server with -l trace
mode):
TRACE: received query SELECT prices.code, prices.npi_number, prices.payer, prices.price, prices.code AS code__1
FROM prices
WHERE prices.code >= 'i' AND prices.code < 'k'
If we try to read the hospital data with Pandas and SQLAlchemy, dolt spins at its I/O limit until an OOM (unless your computer can comfortably load the whole dataset into memory). I killed the process after letting it run for a few minutes:
...
TRACE: returning result row [VARCHAR("52640") CHAR("1922076603") VARCHAR("i_GOLDEN_RULE_INS_CO__S__1350_130146") DECIMAL(3784.00)]
TRACE: returning result row [VARCHAR("52640") CHAR("1922076603") VARCHAR("i_UMR__S__1350_170130") DECIMAL(3784.00)]
TRACE: returning result row [VARCHAR("33019") CHAR("1790717650") VARCHAR("TUFTS HEALTH PUBLIC PLANS [1013]") DECIMAL(5310.00)]
TRACE: returning result row [VARCHAR("33019") CHAR("1790717650") VARCHAR("UNICARE GIC [1015]") DECIMAL(5310.00)]
ERROR: Error in the middle of a stream to client 1 (127.0.0.1:51376): conn 1: Write(packet) failed: write tcp 127.0.0.1:3306->127.0.0.1:51376: write: broken pipe (errno 1105) (sqlstate HY000)
3: Spark DataFrames
Spark is a distributed ETL platform written in Scala. Spark's compute model shares many of the same relational design principles as Dolt, and both execute a range of SQL commands. Unlike Dolt, Spark is not a storage engine and depends on S3 or HDFS. Spark also executes ETL jobs in a distributed master-executor model that works best with ten-to-thousand node clusters. With a little extra setup (compared to the first two examples) we will demonstrate how to load Spark DataFrame objects using Dolt and Pyspark.
An in-depth guide on how to acquire all of the dependencies for Spark is beyond the scope of this blog (this blog gets most of the way there), but the installation steps are loosely:
- Install Java 8
- Install Scala 2.11
- Install Spark 3.1.1
- Point
SPARK_HOME
andPATH
to the new Sparklibexec
andlibexec/bin
, respectively - Download the "platform independent" JDBC connector
- Unzip
mysql-connector-java-8.0.23.jar
and move to your spark jars folder. (there are several options for updating spark confs, this is just one.) pip install pyspark
The "spark job" below accesses our dolt sql-server
to read and print data:
# dolt_pyspark.py
from pyspark.sql import SQLContext
import pyspark
if __name__=="__main__":
sc = pyspark.SparkContext('local[*]')
sqlContext = SQLContext(sc)
df = sqlContext.read.format("jdbc").options(
url="jdbc:mysql://localhost:3306/hospital_price_transparency",
driver="com.mysql.cj.jdbc.Driver",
dbtable="hospitals",
user="root",
password="").load()
df.show()
We add two mysql driver jar flags to spark-submit before running:
> spark-submit \
--driver-class-path=/usr/local/Cellar/apache-spark/3.1.1/libexec/jars/mysql-connector-java-8.0.23.jar \
--jars=/usr/local/Cellar/apache-spark/3.1.1/libexec/jars/mysql-connector-java-8.0.23.jar \
dolt_pyspark.py
...
+------------+--------------------+--------------------+--------------------+---------------+-----+----------+------------+
| npi_number| name| url| street_address| city|state| zip_code|publish_date|
+------------+--------------------+--------------------+--------------------+---------------+-----+----------+------------+
|1003139775.0| HCA Virginia|https://hcavirgin...|901 E. Cary St Su...| Richmond| VA| null| 2021-01-01|
| 1003260480|Brookwood Baptist...|https://www.brook...|2010 Brookwood Me...| Birmingham| AL| 35209| null|
| 1003281452| Henderson Hospital|https://uhsfilecd...|1050 West Galleri...| Henderson| NV| 89011| 2021-01-01|
| 1003362997|CHI Health St. El...|https://www.chihe...| 555 S. 70Th St.| Lincoln| NE| 68510| 2021-01-01|
| 1003389206|Merrill pioneer h...|https://www.avera...|1100 S 10th Ave, ...| Rock Rapids| IA|51246-2020| null|
| 1003811290| Providence Health|http://www.yourpr...| 2435 Forest Drive| Columbia| SC| 29204| null|
| 1003833013|ASCENSION SETON H...|https://healthcar...| 3201 S WATER ST| BURNET| TX|78611-4510| null|
|1003853185.0| Dominion Hospital|https://dominionh...|901 E. Cary St Su...| Richmond| VA| null| 2021-01-01|
| 1003858408|Frisbie Memorial ...|https://frisbieho...| 11 Whitehall Road| Rochester| NH| 03867| 2021-01-01|
|1003858408.0|Frisbie Memorial ...|https://frisbieho...| 11 Whitehall Road| Rochester| NH| 3867.0| 2021-01-01|
|1003862053.0|Doctors Hospital ...|https://doctorsof...| 5731 Bee Ridge Rd| Sarasota| FL| 34233.0| 2021-01-01|
| 1003865825|WILLIAM BEAUMONT ...|https://www.beaum...| 3601 W 13 MILE RD| ROYAL OAK| MI|48073-6712| 2021-01-16|
| 1003873225|The Specialty Hos...|https://www.rushh...| 1314 19th Ave| Meridian| MS|39301-4116| null|
| 1003887266|Avera Missouri Ri...|https://www.avera...| 606 E Garfield Ave| Gettysburg| SD|57442-1325| null|
| 1003905092| Kula Hospital|https://res.cloud...| 628 Seventh Street| Lanai| HI| 96763| null|
|1003908443.0|Mercy Miami Hospital|https://mercymiam...| 3663 S Miami Ave| Miami| FL| 33133.0| 2021-01-01|
| 1013017839|ALEXIAN BROTHERS ...|https://healthcar...|1650 MOON LAKE BO...|HOFFMAN ESTATES| IL| 60169| null|
| 1013062769| Zion Medical Center|https://healthy.k...| 4647 Zion Ave| San Diego| CA| 92120| 2021-01-01|
| 1013085083|Baylor Scott & Wh...|https://www.bswhe...| 546 N Kegley Rd| Temple| TX| 76502| null|
| 1013100692| Abrazo West Campus|https://www.abraz...|13677 West McDowe...| Goodyear| AZ| 85395| null|
+------------+--------------------+--------------------+--------------------+---------------+-----+----------+------------+
only showing top 20 rows
Conclusion
Dolt combines a familiar SQL interface with Git-like version control. That combination gives data scientists and engineers powerful tools for defining reproducible data pipelines. Implementing a widely adopted SQL protocol allows Dolt to drop into existing tools used by the data science community.
Every week Dolt adds new support for tools used by engineers and scientists. If you work with a tool or product that you think may benefit from versioning, merging, diffing, logging, or some other form of lineage and reproducibility reach out to us on our Discord. We would love to hear about your experiences with data applications.