Upleveling Flyte’s Data Lineage Using Dolt

INTEGRATION
8 min read

Introduction

Dolt and Flyte joined forces to build two data integrations. Dolt is a SQL database that supports Git Versioning. Flyte is a workflow orchestrator for creating and evolving machine learning processes and mission-critical data. When combined, Dolt extends Flyte’s data lineage to storage layer versioning.

Logo

Flytekit release (v0.19.0) includes SQLAlchemy and Dolt plugins, two contributions that expand Flyte’s ability to integrate with relational databases. In this blog, we explain what Flyte is, what Dolt is, and how the two intersect to introduce a new way of thinking about reproducibility in machine learning workflows.

What is Flyte?

At its core, Flyte is an orchestrator responsible for quarterbacking the data and compute infrastructure of an enterprise. At a 10k foot view, Flyte is a Kubernetes (K8S) cluster that accepts, executes, and records machine learning and data processing workflows. Flyte can run everything from Python functions to Spark jobs, Sagemaker jobs, and map workflows with thousand-task fan-outs. Jobs are submitted as K8S custom resource definitions (CRD), YAML files that support arbitrary language SDK pluggability. Flyte also comes with a handful of tools to support admins and task monitoring.

Flyte has invested heavily into data reproducibility and versioning. Data dependencies are hashed and recorded as metadata. Task lineage is compiled and saved on or before the execution time. Shared tasks are cached to short circuited workflows and optimize performance. Updates to tasks themselves are versioned and recorded. In a scenario where one wants to know the changes introduced in the data, Dolt’s data versioning can help enhance Flyte’s data lineage.

Flyte Data Typing

Flyte’s type system provides a backbone for reproducibility and versioning. Extra work upfront defining schemas comes to bear in the form of better data quality and metadata management when compiling and executing compute graphs. This comes full circle later with Dolt’s SQL versioning, the same principles applied at the row layer.

The example below, adapted from an XGBoost tutorial in the Flyte documentation shows the type system in action.

@task
def my_task(
       dataset: FlyteFile[typing.TypeVar("csv")]
) -> pandas.DataFrame:
   return pandas.read_csv(dataset)

@workflow
def my_workflow(
   dataset: FlyteFile[
       typing.TypeVar("csv")
   ] =
   "https://raw.githubusercontent.com/jbrownlee/Datasets/master/pima-indians-diabetes.data.csv",
) -> int:
   df = my_task(dataset=dataset)
   return df.shape[0]

Without digging into the weeds too much, the input source (dataset) switches between two identities. In my_workflow, dataset is a static FlyteFile type:

scalar {
  blob {
    metadata {
      type {
        format: "csv"
      }
    }
    uri: "https://raw.githubusercontent.com/jbrownlee/Datasets/master/pima-indians-diabetes.data.csv"
  }
}

At execution time, the orchestrator downloads this file, records an md5 hash and other metadata, and injects a Python path object into my_task:

In[1]: df = pandas.read_csv(dataset, header=None)
In[2]: df.head()
   0    1   2   3    4     5      6   7  8
0  6  148  72  35    0  33.6  0.627  50  1
1  1   85  66  29    0  26.6  0.351  31  0
2  8  183  64   0    0  23.3  0.672  32  1
3  1   89  66  23   94  28.1  0.167  21  0
4  0  137  40  35  168  43.1  2.288  33  1

Rigid typing is an interesting point of differentiation between Flyte and other workflow managers, and key for extending Flyte with data plugins like Dolt.

What is Dolt?

Dolt is a novel relational database with Git versioning. Dolt can do everything MySQL does, including act as a drop-in replacement for application databases. That is one of the reasons we added Flyte’s SQLAlchemy plugin. The select statement below performs the same whether a MySQL RDS or Dolt server is running on uri:

SQLAlchemyTask(
    name="test-task",
    query_template="select * from my_table",
    task_config=SQLAlchemyConfig(
        uri=mysql://user:password@localhost/db",
    ),
)

Dolt was designed from the ground up for versioning data. Dolt’s range of Git features includes cloning, branching, merging, and diffing databases. Dolt databases live in folders like git repositories, and all the git commands you know for version control work exactly the same on the command line in a dolt directory.

Dolt takes Flyte’s versioning one step further, letting users understand how data sources evolve over time. Two table versions can be diffed to see row-level changes. Branches can isolate experiments from others’ work. Data can be queried by commit version, timestamp, or branch name without leaving SQL. Recording execution IDs in dolt commits helps audit data changes.

How It Works

Setup

Using the Flyte-Dolt Plugin will require Flyte and Dolt dependencies. We will use a flytesnacks tutorial for this demo. To follow along, first clone flytesnacks and navigate to the dolt cookbook:

git clone https://github.com/flyteorg/flytesnacks
cd flytesnacks/cookbook/integrations/flytekit_plugins/dolt

We will need several Python dependencies, including flytekitplugins-dolt:

$ pip install -r requirements.txt

We will also need to install dolt:

$ sudo bash -c 'curl -L https://github.com/dolthub/dolt/releases/latest/download/install.sh | sudo bash'

Our workflow reads from a foo database, which can be initialized locally:

$ mkdir foo
$ cd foo
$ dolt init

And we are ready to get started! To test the plugin, run quickstart_example.py, a short workflow that creates, writes, and reads a dataframe:

$ python quickstart_example.py 1
Running quickstart_example.py main...
Running wf(), returns dataframe
          name  count
0  Sugar Maple      3
1        Alice      2
2       George      1
name     object
count     int64
dtype: object

Diffing

This first workflow is comprised of two tasks: populate_rabbits and unwrap_rabbits:

@task
def populate_rabbits(a: int) -> DoltTable:
    rabbits = [("George", a), ("Alice", a*2), ("Sugar Maple", a*3)]
    df = pd.DataFrame(rabbits, columns=["name", "count"])
    return DoltTable(data=df, config=rabbits_conf)

@task
def unwrap_rabbits(table: DoltTable) -> pd.DataFrame:
    return table.data

populate_rabbits creates a dataframe and stores the table in Dolt. unwrap_rabbits does the exact opposite, reading the table from Dolt and returning a DataFrame. Our workflow combines these two steps:

@workflow
def wf(a: int) -> pd.DataFrame:
    rabbits = populate_rabbits(a=a)
    df = unwrap_rabbits(table=rabbits)
    return df

It might appear that this workflow accomplishes little. Any Python script can attach a data attribute to an object and immediately read it back. Flyte’s backend and typing system perform a lot of work under the hood to save the data to Dolt via this configuration:

rabbits_conf = DoltConfig(
    db_path=os.path.join(os.path.dirname(__file__), "foo"),
    tablename="rabbits",
)

Running the workflow again with a different hyperparameter saves a different table:

$ python quickstart_example.py 2
Running quickstart_example.py main...
Running wf(), returns dataframe
          name  count
0        Alice      4
1       George      2
2  Sugar Maple      6
name     object
count     int64
dtype: object

which we can view by inspecting the foo database:

$ cd foo
$ dolt diff HEAD HEAD^ rabbits
diff --dolt a/rabbits b/rabbits
--- a/rabbits @ 61q42dk4ft4f16rh10m9u2as8ko1fr0g
+++ b/rabbits @ ho8q4rnqle80g3ucfbukf9gp4ichv4bl
+-----+-------------+-------+
|     | name        | count |
+-----+-------------+-------+
|  -  | Alice       | 4     |
|  -  | George      | 2     |
|  +  | Sugar Maple | 3     |
|  -  | Sugar Maple | 6     |
|  +  | Alice       | 2     |
|  +  | George      | 1     |
+-----+-------------+-------+

Diffing can be useful productivity or debugging tool. Histograms and distributional shifts help understand the difference between two datasets, but it is also useful to know what rows are shared at a more granular level. Dolt’s structural sharing is the only way to get efficient row-level diffs of data.

Branching

A second example isolates workflow runs using Dolt branches.

The configuration here is more complicated. We will create new branches for different hyperparameter values:

def generate_confs(a: int) -> typing.Tuple[DoltConfig, DoltConfig, DoltConfig]:
   users_conf = DoltConfig(
       db_path=doltdb_path,
       tablename="users",
       branch_conf=NewBranch(f"run/a_is_{a}")
   )

   query_users = DoltTable(
       config=DoltConfig(
           db_path=doltdb_path,
           sql="select * from users where `count` > 5",
           branch_conf=NewBranch(f"run/a_is_{a}"),
       ),
   )

   big_users_conf = DoltConfig(
       db_path=doltdb_path,
       tablename="big_users",
       branch_conf=NewBranch(f"run/a_is_{a}"),
   )

   return users_conf, query_users, big_users_conf

In this example, we have users and big_users tables. The first and last config reference their respective tablenames. The middle config, query_users, is a read-only config that selects users whose count value is greater than 5.

The workflow has two logical steps after populating the users database: filtering the data using our query configuration, and then counting the number of rows returned by the query. populate_users and filter_users are identical to the previous example, refer to the source for the full code:

@workflow
def wf(a: int) -> int:
   user_conf, query_conf, big_user_conf = generate_confs(a=a)
   users = populate_users(a=a, conf=user_conf)
   big_users = filter_users(a=a, all_users=users, filtered_users=query_conf, conf=big_user_conf)
   big_user_cnt = count_users(users=big_users)
   return big_user_cnt

We will run this workflow twice:

$ python branch_example.py 2
Running branch_example.py main...
Running wf(), returns int
1
<class 'int'>
$ python branch_example.py 3
Running branch_example.py main...
Running wf(), returns int
2
<class 'int'>

Which creates distinct branches for our two a values:

$ cd foo
$ dolt branch
* master
  run/a_is_2
  run/a_is_3

As in the first example, we can still diff the two runs:

$ dolt diff run/a_is_2 run/a_is_3
diff --dolt a/big_users b/big_users
--- a/big_users @ 0o5jmq18cmrop7u1otc9mppo2bgraecc
+++ b/big_users @ rgerfd58r0oko1tfn3d1h0kh4a5f4s0v
+-----+-------+
|     | name  |
+-----+-------+
|  +  | Alice |
+-----+-------+
diff --dolt a/users b/users
--- a/users @ t1d0htq6upsbs302s8olga9jdq34bnqe
+++ b/users @ dn31t9b8ssien91nm7lhup22jda45sdt
+-----+-----------+-------+
|     | name      | count |
+-----+-----------+-------+
|  -  | George    | 2     |
|  -  | Stephanie | 6     |
|  +  | George    | 3     |
|  +  | Stephanie | 9     |
|  +  | Alice     | 6     |
|  -  | Alice     | 4     |
+-----+-----------+-------+

Flyte’s caching prevents unnecessary task re-runs, but Dolt will also no-op if a workflow rerun creates no data mutations:

$ python branch_example.py 3
$ dolt log run/a_is_3

Time Travel

Dolt stores data as a commit graph, just like Git. This means you can query data by branch name, as in the last example, or by timestamp or commit hash.

If we remember the query_users config from the last example:

config=DoltConfig(
    db_path=doltdb_path,
    sql="select * from users where `count` > 5",
    branch_conf=NewBranch(f"run/a_is_{a}"),
)

We notice that the query is just a MySQL select statement. Dolt supports AS OF syntax in this statement, pointing to the table at a specific point in time. We use the command line to test how this works:

$ dolt sql --query “select * from users as of HASHOF(‘run/a_is_3’) where `count` > 5"
+-----------+-------+
| name      | count |
+-----------+-------+
| Stephanie | 9     |
| Alice     | 6     |
+-----------+-------+

HASHOF(‘run/a_is_3’) is a quick way to get the most recent commit of the run/a_is_3 branch. If you wanted to pass the specific commit string, you can use these helpers:

$ dolt sql --query “select HASHOF(‘run/a_is_3’) as commit”
+----------------------------------+
| commit                           |
+----------------------------------+
| np2l4f9lq4g5bpl3ghjhbgfo3lv3plkt |
+----------------------------------+

$ dolt log run/a_is_3 -n 1
commit np2l4f9lq4g5bpl3ghjhbgfo3lv3plkt
Author: Bojack Horseman <bojack@horseman.com>
Date:   Fri May 07 12:38:03 -0700 2021

    Generated by Flyte execution id: ex:local:local:local

There are several ways you could use time travel and asof queries:

  • Fetch the most recent data in a specific branch

  • Input the 5 most recent versions of a table

  • Only input newly added rows in the last commit

  • Input data from exactly one month ago

Summary

We have discussed Flyte’s workflow automation software, Dolt’s versioned database, and why you might use Dolt in lieu of static CSV’s to version your data pipelines. We demonstrated a branching organization that creates different workspaces for different versions of an execution run, and how to perform diffs and time travel between those branches.

Dolt schemas are version controlled and gate check row entry, extending Flyte's typing assertions through the data layer. Dolt's branch and merge features are the only efficient solution for collaborating between isolated data workspaces, which when paired with Dolt's lineage history creates a new breed of data catalog. Dolt's storage layer is the only data structure that supports efficient diffing while maintaining decades of B tree optimizations, which lets you avoid manually trying to find the difference between train_final_v5.csv and train_final_actually_v2.csv when something breaks in production.

If you are interested in learning more about Dolt or Flyte please reach out to us in our community channels! We would love to hear about your experiences with data pipelines.

SHARE

JOIN THE DATA EVOLUTION

Get started with Dolt

Or join our mailing list to get product updates.