Change Data Capture With Kedro and Dolt
Introduction
We are pleased to introduce a Kedro-Dolt plugin, a collaboration between Quantum Black Labs and DoltHub designed to expand the data versioning abilities of data scientists and engineers. You will find this useful if you are interested in using data diffing to understand and more effectively communicate data changes with business stakeholders. It'll take about 5 minutes to read, and at the end you'll know how to use Kedro to store data in Dolt and diff workflow results.
What is Kedro?
Kedro is an open-source Python framework for creating reproducible, maintainable and modular data science code. It offers both quick prototyping and production scaling in a single tool. Code is written in Python, packaged into discrete "nodes," which can be combined into pipelines, whereupon Kedro organises their dependencies and execution order.
Data management is one area where Kedro stands out from other workflow managers. Kedro's "data catalog" is a registry of all data sources that the project can use. It encapsulates data source configuration, removing IO as a concern for data scientists.
Your datasets need to be registered so Kedro can load and use them. All
Kedro projects have a conf/base/catalog.yml
file, with each dataset
described with the following information:
- File location (path)
- Parameters for the given dataset
- Type of data
- Versioning
Kedro supports a number of datasets out of the box, and you can also add support for any proprietary data format or filesystem in your pipeline.
As a quick example, consider the DataSet
below, defined in our
catalog.yml
:
iris_data:
type: pandas.CSVDataSet
filepath: "data/01_raw/iris.csv"
Kedro connects iris_data
as an input to the split_data
node
with the following declaration:
node(func=split_data, inputs=["iris_data"], ...)
At runtime, the data
argument of split_data
will now be loaded according
to the dataset configuration for our node input:
def split_data(data: pd.DataFrame, ratio: float) -> Dict[str, Any]:
...
Cleanly delineating the concerns of data infrastructure and data science is important for delivering data pipelines. Nodes, data sources, and pipelines can be customised, composed and executed on a variety of backends with these simple building blocks.
What is Dolt?
Dolt is an SQL database with Git-like versioning. Engineers use Dolt in machine learning workflows to version tabular data in a production-hardened database.
When using Dolt, You don't need to produce new CSV files for every workflow run, instead you can save deltas between runs. There is no need to copy files between folder namespaces, since you can instead branch from, and merge to, a source database. Instead of hunting for quality bugs with scripts and expectation suites, you can run causational analysis on row-level diffs between two data versions.
Using the Kedro-Dolt Plugin
Setup
You can clone our project to follow along:
$ git clone git@github.com:dolthub/kedro-dolt-demo.git
Python dependencies are listed in src/requirements.txt
:
pip install -r src/requirements.txt
This workflow was originally built using the Iris starter project. We will walk through our modifications shortly.
To create our database, first
install dolt
:
$ sudo bash -c 'curl -L https://github.com/dolthub/dolt/releases/latest/download/install.sh | sudo bash'
We can create our database with an initialisation call:
$ dolt init
Successfully initialised dolt data repository
and expose an sql-server at root@localhost:3306/kedro_dolt_demo
by running the following line in a separate window:
$ dolt sql-server --max-connections=10 -l trace
Starting server with Config HP="localhost:3306"|U="root"|P=""|T="28800000"|R="false"|L="trace"
The catalog
configuration
in conf/catalog.yml
shows how we pass this sql connection as a
credential to our data sources:
example_test_x:
type: pandas.SQLTableDataSet
table_name: example_test_x
credentials:
con: mysql+pymysql://root@localhost:3306/kedro_dolt_demo
save_args:
if_exists: replace
Plugin
Kedro uses pluggy
-style Hooks
to register callbacks during stages of a
workflow's lifecycle.
We made our own plugin released as a PyPi package, kedro-dolt
:
pip install kedro-dolt
from kedro_dolt import DoltHook
The Kedro-Dolt plugin defines
before_pipeline_run
and after_pipeline_run
methods to loop into
workflow executions. You can find more information on Kedro Hooks and the
lifecycle stages that are exposed in the Kedro
docs.
Before the pipeline starts we can checkout a database branch if
--params branch:<value>
is specified:
@hook_impl
def before_pipeline_run(self, run_params: Dict[str, Any]):
if (
"branch" in run_params["extra_params"] and
run_params["extra_params"]["branch"] is not None
):
self._branch = run_params["extra_params"]["branch"]
self._original_branch = self._active_branch
self._checkout_branch(self._branch)
The pipeline cleanup hook commits changes and restores the original database branch.
@hook_impl
def after_pipeline_run(self, run_params: Dict[str, Any]):
commit_message = self._commit_message(run_params=run_params)
self._commit(commit_message)
if self._original_branch is not None:
self._checkout_branch(self._original_branch)
A Dolt commit persists data analogous to a Git commit. Commits hold a database root value, message, timestamp and parent commit reference.
By default, the Kedro-Dolt plugin will record an execution's run id in the commit:
def _commit_message(self, run_params: Dict[str, Any]):
return f"Update from kedro run: {run_params['run_id']}"
Hooks are declared by populating
settings.py
in our database configuration:
from kedro_dolt_demo.hooks import ProjectHooks
DOLT_DATABASE="kedro_dolt_demo"
HOOKS = (ProjectHooks(database=DOLT_DATABASE),)
Diffs
Now that we've initialised our repo and connected our hook, we can execute our workflow:
$ kedro run --params example_test_data_ratio:0.1
$ kedro run --params example_test_data_ratio:0.2
The data from these runs populate our dolt database:
$ dolt log
commit m3112s3uuird3rtjt28cdeitp5prp6td
Author: Lim Hoang <limdauto@gmail.com>
Date: Fri Apr 23 23:46:04 +0100 2021
Update data from Kedro run: 2021-04-23T22.45.45.157Z
commit jc77hh54t97na1hs8i6k8b5pfrh7tiej
Author: Lim Hoang <limdauto@gmail.com>
Date: Fri Apr 23 23:45:01 +0100 2021
Update data from Kedro run 2021-04-23T22.44.41.926Z
We ran our pipeline with two different hyperparameters so that we can
compare the outputs with dolt diff
. Diffs traverse the database
to efficiently surface modifications between tables:
$ dolt diff HEAD HEAD^ example_test_x --limit 5
diff --dolt a/example_test_x b/example_test_x
--- a/example_test_x @ jc77hh54t97na1hs8i6k8b5pfrh7tiej
+++ b/example_test_x @ m3112s3uuird3rtjt28cdeitp5prp6td
+-----+--------------+-------------+--------------+-------------+
| | sepal_length | sepal_width | petal_length | petal_width |
+-----+--------------+-------------+--------------+-------------+
| + | 7.2 | 3.6 | 6.1 | 2.5 |
| + | 4.9 | 3 | 1.4 | 0.2 |
| + | 6.7 | 3.1 | 5.6 | 2.4 |
| + | 6.7 | 3 | 5 | 1.7 |
| + | 4.4 | 2.9 | 1.4 | 0.2 |
+-----+--------------+-------------+--------------+-------------+
Dolt's system tables retrieve more programmatically useful diff outputs:
$ dolt sql -q "
select to_sepal_width, from_sepal_width, diff_type, to_commit, from_commit
from dolt_diff_example_test_x
where from_commit = hashof('HEAD^') and
to_commit = hashof('HEAD')
limit 5"
+----------------+------------------+-----------+----------------------------------+----------------------------------+
| to_sepal_width | from_sepal_width | diff_type | to_commit | from_commit |
+----------------+------------------+-----------+----------------------------------+----------------------------------+
| 3.3 | NULL | added | bku4billo5s1ldr3qkmej0gffi0urv1q | um0nq2pue9d8vgfsc878arosuala7c6e |
| NULL | 2.4 | removed | bku4billo5s1ldr3qkmej0gffi0urv1q | um0nq2pue9d8vgfsc878arosuala7c6e |
| NULL | 3 | removed | bku4billo5s1ldr3qkmej0gffi0urv1q | um0nq2pue9d8vgfsc878arosuala7c6e |
| 3.5 | NULL | added | bku4billo5s1ldr3qkmej0gffi0urv1q | um0nq2pue9d8vgfsc878arosuala7c6e |
| 3 | NULL | added | bku4billo5s1ldr3qkmej0gffi0urv1q | um0nq2pue9d8vgfsc878arosuala7c6e |
+----------------+------------------+-----------+----------------------------------+----------------------------------+
Branching
Here we run our workflow in a fresh branch, isolating our workflow from others:
$ kedro run --params branch:new,example_test_data_ratio:0.3
Branch diffs work the same way as commit diffs:
$ dolt diff master new example_test_x --limit 5
diff --dolt a/example_test_x b/example_test_x
--- a/example_test_x @ phat5gjncsmcr6ke7hkm71kqi3p3fh2r
+++ b/example_test_x @ icsc2i6mdempgq0dvbocuu90mgfb0jf8
+-----+--------------+-------------+--------------+-------------+
| | sepal_length | sepal_width | petal_length | petal_width |
+-----+--------------+-------------+--------------+-------------+
| + | 6.8 | 3.2 | 5.9 | 2.3 |
| - | 6.4 | 2.7 | 5.3 | 1.9 |
| - | 4.8 | 3.4 | 1.6 | 0.2 |
| - | 5.1 | 3.8 | 1.5 | 0.3 |
| + | 4.9 | 2.5 | 4.5 | 1.7 |
+-----+--------------+-------------+--------------+-------------+
Select Query
The Iris demo does not read from our database, but you could use select statements to link dependencies using specific versions of data:
iris_predictions:
type: pandas.SQLQueryDataSet
sql:"SELECT * FROM example_test_y AS OF HASHOF('new')"
credentials:
con: mysql+pymysql://root@localhost:3306/kedro_dolt_demo
save_args:
if_exists: replace
HASHOF('new')
resolves the latest commit of our new
branch, letting
us indirectly track its newest committed data.
We can sample this select query on the command-line:
$ dolt sql -q "select * from example_test_y as of hashof('new') limit 5"
+--------+------------+-----------+
| setosa | versicolor | virginica |
+--------+------------+-----------+
| 0 | 0 | 1 |
| 0 | 0 | 1 |
| 0 | 0 | 1 |
| 0 | 0 | 1 |
| 0 | 0 | 1 |
+--------+------------+-----------+
Run Id Search
Translating between workflow executions, versions code and versions of
data can be a useful discovery tool. We use the dolt_commits
system
table below to filter for a commit containing a run id of interest --
2021-05-03T20.51.27.129Z
:
$ dolt sql -q "
select commit_hash, message
from dolt_commits
where message like '%2021-05-03T20.51.27.129Z'"
+----------------------------------+------------------------------------------------+
| commit_hash | message |
+----------------------------------+------------------------------------------------+
| 2vffmvs72phcuccpnmaud6fgrs8t7p4k | Update from kedro run 2021-05-03T20.51.27.129Z |
+----------------------------------+------------------------------------------------+
If we wished to index commits by hyperparameter value, we could customise the
DoltHook._commit_message()
method:
def _commit_message(self, run_params: Dict[str, Any]):
import json
return f"Update from kedro run: {json.dumps(run_params['extra_params'])}"
Resulting in the following commit:
$ dolt sql -q "select message from dolt_log limit 1"
+------------------------------------------+
| message |
+------------------------------------------+
| Update from kedro run: {"branch": "new"} |
+------------------------------------------+
Conclusion
In summary, we have shown how data scientists can version Kedro data in Dolt. Integrating Dolt with Kedro requires one hook to credential a data source and zero changes to existing application logic, as Dolt is dataframe compatible.
Dolt gives data scientists time-travel and diffing capabilities which can feel like superpowers. Data scientists can instantly know how and why a model changed from the data and roll back to a known good state. Acting as consultants, data scientists can clearly communicate with business stakeholders about how and why results change over time. Dolt provides a step function increase in model explainability.
The Kedro-Dolt plugin currently supports a limited set of features. We encourage you to reach out on the Kedro Discord channel if you are interested in a more advanced plugin, want to see how to run Kedro against a production Dolt instance running like RDS, or just want to learn more about Dolt.
If you want to find out more about Kedro, the community is a great place to start. You can also find the code in the Kedro repository on Github and the documentation on ReadTheDocs.