Using Debezium to Capture Changes from a Dolt Database

INTEGRATION
19 min read

A few weeks ago, we announced the first release of support for Dolt-to-MySQL replication. Dolt now supports replicating to a MySQL database and from a MySQL database, through the MySQL binlog protocol. In addition to replicating databases to another server, the MySQL binlog replication protocol is also super useful for change data capture use cases. With the new support for Dolt-to-MySQL replication, there are some exciting new ways to use Dolt in a change data capture pipeline. In this blog post, we're going to be looking at how you can run a Dolt database and use Debezium to capture all the changes to your Dolt database and easily notify other systems of that changing data.

What is Debezium?

Debezium is an open-source distributed platform for change data capture (CDC). It's widely used for tracking changes in databases and forwarding those changes to downstream applications in real-time. Debezium captures row-level changes from database tables and generates events that are published to Kafka topics, allowing applications to consume them and react as changes occur. This is crucial for building data-driven applications, synchronizing data across microservices, and ensuring real-time analytics.

Debezium is popular for several reasons. First, it captures changes in near real-time to help keep data up-to-date across different systems without the need for complex and resource-intensive polling mechanisms. Second, Debezium supports a wide range of databases (e.g. MySQL, PostgreSQL, MongoDB, Oracle, and now Dolt!), making it a good choice for many environments. Finally, Debezium's integration with Apache Kafka allows for seamless streaming and processing of data changes, leveraging Kafka's ecosystem, while preventing consumers from needing to tightly couple apps to their database.

How does Debezium work with Dolt?

Let's take a closer look at the architecture of a system with Debezium set up to provide change data capture events from a Dolt SQL server.

Debezium is a powerful tool, and relies on several system-level dependencies, such as Apache Kafka and ZooKeeper, to function. This allows Debezium to feed your applications data changes without requiring tight coupling of your application directly to the database. This loose coupling does mean that setting up a Debezium pipeline can a bit complex, since there are several moving parts that need to be configured and managed.

Debezium Architecture

In the image above, we see the Dolt SQL server streams binlog events to the Debezium Connector for MySQL. This connector knows how to connect to a MySQL-compatible server, establish a binlog stream, and read events. It converts those MySQL binlog events to a change record format and publishes them to a Kafka topic. The Debezium Connector sits in the Kafka Connect framework, which is an adapter framework that enables various data source integrations to easily publish to a Kafka system.

Apache Kafka is an open-source distributed event streaming platform that decouples communication between event producers and event consumers. It serves as the messaging backbone for Debezium. There are several benefits of using Apache Kafka in this architecture: your applications don't need to have a direct connection to your database, it's easy for multiple applications to consume the same events, and Kafka handles event persistence so your application doesn't need to worry about that.

Finally, Kafka relies on Apache ZooKeeper for persisting topic, partition, and broker configuration, as well as for leader election.

Demo

Now that you know the high-level system architecture for how Dolt integrates with Debezium, let's put those pieces together and see it in action. This demo is adapted from the Debezium Getting Started Tutorial, so you can reference the tutorial for more information and context about each part of the system.

To run this demo, you'll need Docker installed and configured on your machine. Each part of the system, except for Dolt, will run in its own Docker container. You'll also need the latest version of Dolt installed, as well as the curl command line utility, and the mysql client. We'll be using a handful of terminal windows to start up each part in the system.

Terminal 1: Apache ZooKeeper

In the first terminal window, we'll start up Apache ZooKeeper. Kafka uses ZooKeeper to keep track of topics, partitions, and their respective leaders, so we'll need to get ZooKeeper started up before we can start any other parts of the system. The following docker command will launch the ZooKeeper image from the Debezium Getting Started Tutorial and ensure a few ports are exposed from the container.

docker run -it --rm --name zookeeper -p 2181:2181 -p 2888:2888 -p 3888:3888 quay.io/debezium/zookeeper:2.5

You should see some output like below. Keep an eye out for any error messages that might indicate something went wrong.

Starting up in standalone mode
/usr/bin/java
ZooKeeper JMX enabled by default
Using config: /zookeeper/conf/zoo.cfg
2024-07-16 17:15:33,873 - INFO  [main:o.a.z.s.q.QuorumPeerConfig@177] - Reading configuration from: /zookeeper/conf/zoo.cfg
2024-07-16 17:15:33,876 - INFO  [main:o.a.z.s.q.QuorumPeerConfig@440] - clientPortAddress is 0.0.0.0:2181

Terminal 2: Apache Kafka

Once ZooKeeper has started up successfully, open up a new terminal window where we can launch Apache Kafka. Kafka serves as the delivery mechanism for data change events from Debezium. Run the docker command below to start up a Kafka container. Note that we use --link to allow the Kafka container to communicate with the ZooKeeper container.

docker run -it --rm --name kafka -p 9092:9092 --link zookeeper:zookeeper quay.io/debezium/kafka:2.5

Again, keep an eye out for any errors or stack traces in case something goes wrong. If everything starts up correctly, you should see output similar to this:

2024-07-16 17:15:45,731 - INFO  [main:AppInfoParser$AppInfo@119] - Kafka version: 3.6.1
2024-07-16 17:15:45,731 - INFO  [main:AppInfoParser$AppInfo@120] - Kafka commitId: 5e3c2b738d253ff5
2024-07-16 17:15:45,731 - INFO  [main:AppInfoParser$AppInfo@121] - Kafka startTimeMs: 1721150145728
2024-07-16 17:15:45,732 - INFO  [main:Logging@66] - [KafkaServer id=1] started

Terminal 3: Dolt SQL Server

Now that Kafka is up and running, let's start up a Dolt SQL server that will stream binlog events to Debezium.

We're using --pull always in the Docker command below to make sure you get the latest version of the Dolt SQL server Docker image. So, if you already have an older image version pulled, this option ensures you'll run the latest version of Dolt.

docker run --pull always --name dolt -p 3306:3306 --detach dolthub/dolt-sql-server:latest 

Note that we're running this container without attaching to it because we're going to come back and restart it in the next step.

Terminal 4: Dolt SQL Shell

In a new terminal window, use the mysql client to connect to the Dolt server we just started up.

mysql -uroot --protocol TCP

First we're going to configure a couple of persisted options to turn on binary logging the next time we restart this server. Note that we're using SET @@PERSIST here to set these variables, since we need them persisted across server restarts – in order to enable replication, these system variables must be set before the server starts up. Setting them after the server is started does not enable replication.

SET @@PERSIST.log_bin=ON;
SET @@PERSIST.gtid_mode=ON;
SET @@PERSIST.enforce_gtid_consistency=ON;
SET @@PERSIST.time_zone='-07:00';

After you've set those system variables, jump back to terminal window #3, where we started the Dolt container, and restart the container to turn on binary logging.

docker restart dolt

Next, create the inventory database and the customers table that our watch-topic container later in this demo will be watching for changes.

CREATE DATABASE inventory;
use inventory;
create table customers (pk int primary key, name varchar(100));

Keep this SQL shell open; we'll come back to it at the end of this demo to test some changes.

Terminal 5: Kafka Connect Container

Just a couple more pieces to set up now! In another new terminal shell, we're going to start up the Kafka Connect container. The Kafka container we started earlier provides the platform for event publishing and consuming, while Kafka Connect is an adapter framework that allows us to load in integration with different datasources. After we get this container started up, we'll be loading in the Debezium MySQL connector, which is what will read binlog events from our Dolt SQL server and publish them to the Kafka event system for distribution to other parts of our application that are listening to our Kafka topic.

docker run -it --rm --name connect -p 8083:8083 -e GROUP_ID=1 -e CONFIG_STORAGE_TOPIC=my_connect_configs -e OFFSET_STORAGE_TOPIC=my_connect_offsets -e STATUS_STORAGE_TOPIC=my_connect_statuses --link dolt --link kafka quay.io/debezium/connect:2.5

Terminal 6: Kafka Connect Admin Shell

Next, we need to load the Debezium connector into the Kafka Connect container we launched in the last step. Open up a new terminal, and we'll use the curl command to send some messages to the Kafka Connect container.

First, let's do a sanity check and make sure we can reach the Kafka Connect container:

curl -H "Accept:application/json" localhost:8083/

You should see some output like below. If you don't, go back and look at your logs and see if there were any errors starting up any of the containers.

{"version":"3.6.1","commit":"5e3c2b738d253ff5","kafka_cluster_id":"RXtFPjx2SDCHu3avhI_9qg"}

Next, we'll load in the Debezium MySQL connector with the command below. You can see how this JSON payload specifies the connector class (io.debezium.connector.mysql.MySqlConnector), the hostname, and port for the database, as well as the Kafka topic and server to send events to.

curl -i -X POST -H "Accept:application/json" -H "Content-Type:application/json" localhost:8083/connectors/ -d '{ "name": "inventory-connector", "config": { "connector.class": "io.debezium.connector.mysql.MySqlConnector", "tasks.max": "1", "database.hostname": "dolt", "database.port": "3306", "database.user": "root", "database.password": "", "database.server.id": "1", "topic.prefix": "dbserver1", "database.include.list": "inventory", "schema.history.internal.kafka.bootstrap.servers": "kafka:9092", "schema.history.internal.kafka.topic": "schema-changes.inventory" } }'

If this works correctly, you should see an HTTP 201 response like this:

HTTP/1.1 201 Created
Date: Tue, 16 Jul 2024 17:20:01 GMT
Location: http://localhost:8083/connectors/inventory-connector
Content-Type: application/json
Content-Length: 505
Server: Jetty(9.4.52.v20230823)

{"name":"inventory-connector","config":{"connector.class":"io.debezium.connector.mysql.MySqlConnector","tasks.max":"1","database.hostname":"host.docker.internal","database.port":"11229","database.user":"root","database.password":"","database.server.id":"1","topic.prefix":"dbserver1","database.include.list":"inventory","schema.history.internal.kafka.bootstrap.servers":"kafka:9092","schema.history.internal.kafka.topic":"schema-changes.inventory","name":"inventory-connector"},"tasks":[],"type":"source"}

You can test that the connector is running correctly by hitting the connectors resource and verifying that the inventory connector is present.

curl -H "Accept:application/json" localhost:8083/connectors/

You should see:

["inventory-connector"]

Finally, you can take a closer look at the inventory connector by running the command below:

curl -i -X GET -H "Accept:application/json" localhost:8083/connectors/inventory-connector

You should get an HTTP 200 response that contains the configuration for the inventory-connector, as well a tasks element that shows us the connector is running.

HTTP/1.1 200 OK
Date: Tue, 16 Jul 2024 17:20:13 GMT
Content-Type: application/json
Content-Length: 549
Server: Jetty(9.4.52.v20230823)

{"name":"inventory-connector","config":{"connector.class":"io.debezium.connector.mysql.MySqlConnector","database.user":"root","topic.prefix":"dbserver1","schema.history.internal.kafka.topic":"schema-changes.inventory","database.server.id":"1","tasks.max":"1","database.hostname":"host.docker.internal","database.password":"","name":"inventory-connector","schema.history.internal.kafka.bootstrap.servers":"kafka:9092","database.port":"11229","database.include.list":"inventory"},"tasks":[{"connector":"inventory-connector","task":0}],"type":"source"}

Terminal 7: Watch Changes Shell

We're at the final stretch now! The system is up and running, and all we need to do is kick the tires now... This last Docker container we're going to start simply listens on the Kafka topic where Debezium is sending events and prints them out to the console.

docker run -it --rm --name watcher --link zookeeper:zookeeper --link kafka:kafka quay.io/debezium/kafka:2.5 watch-topic -a -k dbserver1.inventory.customers

Test Some Changes

And now, the moment of truth! If everything has been hooked up correctly, then when we change the data in our Dolt database, we should see those changes printed out in the watch-topic container we just started.

Inserts

Let's jump back into our Dolt SQL Shell that we started in terminal #4 and insert a row into the customers table:

insert into customers values (1, 'Alice'); 

Switch back to the watch-topic container running in terminal #7, and you should see a big block of JSON printed out. There are two separate JSON documents displayed for the row we inserted. Each JSON document contains a schema element and a payload element. The first document's payload identifies the primary key of the row that changed, the second document's payload describes the row's data before and after the change. To make it easier to read, I've omitted the schema element and am only showing the payload element for each document, but if you want to see the full output, you can expand the details section below.

Here's the payload of the first document, which identifies the key of the row that changed:

{
    "schema": ...
    "payload":{"pk":1}
}

And here's the payload of the second document, which shows us that this row did not exist before this change (i.e. null) and after the change it contains the primary key 1 and the name Alice. Note that it also includes a source section that contains some metadata about where the change came from.

{
  "schema": ...
  "payload": {
    "before": null,
    "after": {
      "pk": 1,
      "name": "Alice"
    },
    "source": {
      "version": "2.5.4.Final",
      "connector": "mysql",
      "name": "dbserver1",
      "ts_ms": 1721153143000,
      "snapshot": "false",
      "db": "inventory",
      "sequence": null,
      "table": "customers",
      "server_id": 1,
      "gtid": "5d9f7521-dbe6-46bd-a5dd-f5ae568c00ec:8",
      "file": "binlog-main.000001",
      "pos": 1599,
      "row": 0,
      "thread": 0,
      "query": null
    },
    "op": "c",
    "ts_ms": 1721166337320,
    "transaction": null
  }
}
Click to see the full JSON output of both documents
{
  "schema": {
    "type": "struct",
    "fields": [
      {
        "type": "int32",
        "optional": false,
        "field": "pk"
      }
    ],
    "optional": false,
    "name": "dbserver1.inventory.customers.Key"
  },
  "payload": {
    "pk": 1
  }
}

{
  "schema": {
    "type": "struct",
    "fields": [
      {
        "type": "struct",
        "fields": [
          {
            "type": "int32",
            "optional": false,
            "field": "pk"
          },
          {
            "type": "string",
            "optional": true,
            "field": "name"
          }
        ],
        "optional": true,
        "name": "dbserver1.inventory.customers.Value",
        "field": "before"
      },
      {
        "type": "struct",
        "fields": [
          {
            "type": "int32",
            "optional": false,
            "field": "pk"
          },
          {
            "type": "string",
            "optional": true,
            "field": "name"
          }
        ],
        "optional": true,
        "name": "dbserver1.inventory.customers.Value",
        "field": "after"
      },
      {
        "type": "struct",
        "fields": [
          {
            "type": "string",
            "optional": false,
            "field": "version"
          },
          {
            "type": "string",
            "optional": false,
            "field": "connector"
          },
          {
            "type": "string",
            "optional": false,
            "field": "name"
          },
          {
            "type": "int64",
            "optional": false,
            "field": "ts_ms"
          },
          {
            "type": "string",
            "optional": true,
            "name": "io.debezium.data.Enum",
            "version": 1,
            "parameters": {
              "allowed": "true,last,false,incremental"
            },
            "default": "false",
            "field": "snapshot"
          },
          {
            "type": "string",
            "optional": false,
            "field": "db"
          },
          {
            "type": "string",
            "optional": true,
            "field": "sequence"
          },
          {
            "type": "string",
            "optional": true,
            "field": "table"
          },
          {
            "type": "int64",
            "optional": false,
            "field": "server_id"
          },
          {
            "type": "string",
            "optional": true,
            "field": "gtid"
          },
          {
            "type": "string",
            "optional": false,
            "field": "file"
          },
          {
            "type": "int64",
            "optional": false,
            "field": "pos"
          },
          {
            "type": "int32",
            "optional": false,
            "field": "row"
          },
          {
            "type": "int64",
            "optional": true,
            "field": "thread"
          },
          {
            "type": "string",
            "optional": true,
            "field": "query"
          }
        ],
        "optional": false,
        "name": "io.debezium.connector.mysql.Source",
        "field": "source"
      },
      {
        "type": "string",
        "optional": false,
        "field": "op"
      },
      {
        "type": "int64",
        "optional": true,
        "field": "ts_ms"
      },
      {
        "type": "struct",
        "fields": [
          {
            "type": "string",
            "optional": false,
            "field": "id"
          },
          {
            "type": "int64",
            "optional": false,
            "field": "total_order"
          },
          {
            "type": "int64",
            "optional": false,
            "field": "data_collection_order"
          }
        ],
        "optional": true,
        "name": "event.block",
        "version": 1,
        "field": "transaction"
      }
    ],
    "optional": false,
    "name": "dbserver1.inventory.customers.Envelope",
    "version": 1
  },
  "payload": {
    "before": null,
    "after": {
      "pk": 1,
      "name": "Alice"
    },
    "source": {
      "version": "2.5.4.Final",
      "connector": "mysql",
      "name": "dbserver1",
      "ts_ms": 1721153143000,
      "snapshot": "false",
      "db": "inventory",
      "sequence": null,
      "table": "customers",
      "server_id": 1,
      "gtid": "5d9f7521-dbe6-46bd-a5dd-f5ae568c00ec:8",
      "file": "binlog-main.000001",
      "pos": 1599,
      "row": 0,
      "thread": 0,
      "query": null
    },
    "op": "c",
    "ts_ms": 1721166337320,
    "transaction": null
  }
}

Updates

Let's try another data change. This time we'll update the name of the customer with primary key 1 to be White Rabbit.

update customers set name='White Rabbit' where pk=1; 

Just like before, there are two JSON documents displayed by the watch-topic container. The first document identifies the key of the row that changed, and the second document shows us the row's data before and after the change.

Since we're modifying the same row we inserted in the last step, the first JSON document, which identifies the row that changed, shows the exact same payload section:

{
  "schema" ...
  "payload": {
    "pk": 1
  }
}

The payload in the second JSON document looks a bit different though. Since this is an UPDATE operation, the before section is populated with the contents of the row from before our update ({1, "Alice"}) and the after section shows the new row values ({1, "White Rabbit"}):

{
  "schema": ...
  "payload": {
    "before": {
      "pk": 1,
      "name": "Alice"
    },
    "after": {
      "pk": 1,
      "name": "White Rabbit"
    },
    "source": {
      "version": "2.5.4.Final",
      "connector": "mysql",
      "name": "dbserver1",
      "ts_ms": 1721153143000,
      "snapshot": "false",
      "db": "inventory",
      "sequence": null,
      "table": "customers",
      "server_id": 1,
      "gtid": "5d9f7521-dbe6-46bd-a5dd-f5ae568c00ec:10",
      "file": "binlog-main.000001",
      "pos": 2080,
      "row": 0,
      "thread": 0,
      "query": null
    },
    "op": "u",
    "ts_ms": 1721241642927,
    "transaction": null
  }
}
Click to see the full JSON output of both documents
{
  "schema": {
    "type": "struct",
    "fields": [
      {
        "type": "int32",
        "optional": false,
        "field": "pk"
      }
    ],
    "optional": false,
    "name": "dbserver1.inventory.customers.Key"
  },
  "payload": {
    "pk": 1
  }
}

{
  "schema": {
    "type": "struct",
    "fields": [
      {
        "type": "struct",
        "fields": [
          {
            "type": "int32",
            "optional": false,
            "field": "pk"
          },
          {
            "type": "string",
            "optional": true,
            "field": "name"
          }
        ],
        "optional": true,
        "name": "dbserver1.inventory.customers.Value",
        "field": "before"
      },
      {
        "type": "struct",
        "fields": [
          {
            "type": "int32",
            "optional": false,
            "field": "pk"
          },
          {
            "type": "string",
            "optional": true,
            "field": "name"
          }
        ],
        "optional": true,
        "name": "dbserver1.inventory.customers.Value",
        "field": "after"
      },
      {
        "type": "struct",
        "fields": [
          {
            "type": "string",
            "optional": false,
            "field": "version"
          },
          {
            "type": "string",
            "optional": false,
            "field": "connector"
          },
          {
            "type": "string",
            "optional": false,
            "field": "name"
          },
          {
            "type": "int64",
            "optional": false,
            "field": "ts_ms"
          },
          {
            "type": "string",
            "optional": true,
            "name": "io.debezium.data.Enum",
            "version": 1,
            "parameters": {
              "allowed": "true,last,false,incremental"
            },
            "default": "false",
            "field": "snapshot"
          },
          {
            "type": "string",
            "optional": false,
            "field": "db"
          },
          {
            "type": "string",
            "optional": true,
            "field": "sequence"
          },
          {
            "type": "string",
            "optional": true,
            "field": "table"
          },
          {
            "type": "int64",
            "optional": false,
            "field": "server_id"
          },
          {
            "type": "string",
            "optional": true,
            "field": "gtid"
          },
          {
            "type": "string",
            "optional": false,
            "field": "file"
          },
          {
            "type": "int64",
            "optional": false,
            "field": "pos"
          },
          {
            "type": "int32",
            "optional": false,
            "field": "row"
          },
          {
            "type": "int64",
            "optional": true,
            "field": "thread"
          },
          {
            "type": "string",
            "optional": true,
            "field": "query"
          }
        ],
        "optional": false,
        "name": "io.debezium.connector.mysql.Source",
        "field": "source"
      },
      {
        "type": "string",
        "optional": false,
        "field": "op"
      },
      {
        "type": "int64",
        "optional": true,
        "field": "ts_ms"
      },
      {
        "type": "struct",
        "fields": [
          {
            "type": "string",
            "optional": false,
            "field": "id"
          },
          {
            "type": "int64",
            "optional": false,
            "field": "total_order"
          },
          {
            "type": "int64",
            "optional": false,
            "field": "data_collection_order"
          }
        ],
        "optional": true,
        "name": "event.block",
        "version": 1,
        "field": "transaction"
      }
    ],
    "optional": false,
    "name": "dbserver1.inventory.customers.Envelope",
    "version": 1
  },
  "payload": {
    "before": {
      "pk": 1,
      "name": "Alice"
    },
    "after": {
      "pk": 1,
      "name": "White Rabbit"
    },
    "source": {
      "version": "2.5.4.Final",
      "connector": "mysql",
      "name": "dbserver1",
      "ts_ms": 1721153143000,
      "snapshot": "false",
      "db": "inventory",
      "sequence": null,
      "table": "customers",
      "server_id": 1,
      "gtid": "5d9f7521-dbe6-46bd-a5dd-f5ae568c00ec:10",
      "file": "binlog-main.000001",
      "pos": 2080,
      "row": 0,
      "thread": 0,
      "query": null
    },
    "op": "u",
    "ts_ms": 1721241642927,
    "transaction": null
  }
}

Deletes

Finally, let's try deleting the row from our table so we can see what the JSON output looks like in that case.

delete from customers where pk=1;

Again, the first JSON document identifies the key of the row that changed:

{
  "schema": ...
  "payload": {
    "pk": 1
  }
}

And the second JSON document shows us the before and after values of that row. For a DELETE operation, the before section shows us the row values that exited before the row was deleted, and the after section is null, indicating that the row no longer exists:

{
  "schema": ...
  "payload": {
    "before": {
      "pk": 1,
      "name": "White Rabbit"
    },
    "after": null,
    "source": {
      "version": "2.5.4.Final",
      "connector": "mysql",
      "name": "dbserver1",
      "ts_ms": 1721153143000,
      "snapshot": "false",
      "db": "inventory",
      "sequence": null,
      "table": "customers",
      "server_id": 1,
      "gtid": "5d9f7521-dbe6-46bd-a5dd-f5ae568c00ec:11",
      "file": "binlog-main.000001",
      "pos": 2350,
      "row": 0,
      "thread": 0,
      "query": null
    },
    "op": "d",
    "ts_ms": 1721242327938,
    "transaction": null
  }
}
Click to see the full JSON output of both documents
{
  "schema": {
    "type": "struct",
    "fields": [
      {
        "type": "int32",
        "optional": false,
        "field": "pk"
      }
    ],
    "optional": false,
    "name": "dbserver1.inventory.customers.Key"
  },
  "payload": {
    "pk": 1
  }
}

{
  "schema": {
    "type": "struct",
    "fields": [
      {
        "type": "struct",
        "fields": [
          {
            "type": "int32",
            "optional": false,
            "field": "pk"
          },
          {
            "type": "string",
            "optional": true,
            "field": "name"
          }
        ],
        "optional": true,
        "name": "dbserver1.inventory.customers.Value",
        "field": "before"
      },
      {
        "type": "struct",
        "fields": [
          {
            "type": "int32",
            "optional": false,
            "field": "pk"
          },
          {
            "type": "string",
            "optional": true,
            "field": "name"
          }
        ],
        "optional": true,
        "name": "dbserver1.inventory.customers.Value",
        "field": "after"
      },
      {
        "type": "struct",
        "fields": [
          {
            "type": "string",
            "optional": false,
            "field": "version"
          },
          {
            "type": "string",
            "optional": false,
            "field": "connector"
          },
          {
            "type": "string",
            "optional": false,
            "field": "name"
          },
          {
            "type": "int64",
            "optional": false,
            "field": "ts_ms"
          },
          {
            "type": "string",
            "optional": true,
            "name": "io.debezium.data.Enum",
            "version": 1,
            "parameters": {
              "allowed": "true,last,false,incremental"
            },
            "default": "false",
            "field": "snapshot"
          },
          {
            "type": "string",
            "optional": false,
            "field": "db"
          },
          {
            "type": "string",
            "optional": true,
            "field": "sequence"
          },
          {
            "type": "string",
            "optional": true,
            "field": "table"
          },
          {
            "type": "int64",
            "optional": false,
            "field": "server_id"
          },
          {
            "type": "string",
            "optional": true,
            "field": "gtid"
          },
          {
            "type": "string",
            "optional": false,
            "field": "file"
          },
          {
            "type": "int64",
            "optional": false,
            "field": "pos"
          },
          {
            "type": "int32",
            "optional": false,
            "field": "row"
          },
          {
            "type": "int64",
            "optional": true,
            "field": "thread"
          },
          {
            "type": "string",
            "optional": true,
            "field": "query"
          }
        ],
        "optional": false,
        "name": "io.debezium.connector.mysql.Source",
        "field": "source"
      },
      {
        "type": "string",
        "optional": false,
        "field": "op"
      },
      {
        "type": "int64",
        "optional": true,
        "field": "ts_ms"
      },
      {
        "type": "struct",
        "fields": [
          {
            "type": "string",
            "optional": false,
            "field": "id"
          },
          {
            "type": "int64",
            "optional": false,
            "field": "total_order"
          },
          {
            "type": "int64",
            "optional": false,
            "field": "data_collection_order"
          }
        ],
        "optional": true,
        "name": "event.block",
        "version": 1,
        "field": "transaction"
      }
    ],
    "optional": false,
    "name": "dbserver1.inventory.customers.Envelope",
    "version": 1
  },
  "payload": {
    "before": {
      "pk": 1,
      "name": "White Rabbit"
    },
    "after": null,
    "source": {
      "version": "2.5.4.Final",
      "connector": "mysql",
      "name": "dbserver1",
      "ts_ms": 1721153143000,
      "snapshot": "false",
      "db": "inventory",
      "sequence": null,
      "table": "customers",
      "server_id": 1,
      "gtid": "5d9f7521-dbe6-46bd-a5dd-f5ae568c00ec:11",
      "file": "binlog-main.000001",
      "pos": 2350,
      "row": 0,
      "thread": 0,
      "query": null
    },
    "op": "d",
    "ts_ms": 1721242327938,
    "transaction": null
  }
}

Dolt Versioning Commands

Alright, so inserts, updates, and deletes are all working as expected. How about if we try something a little more advanced... let's try to merge in changes from another Dolt branch.

Back in our Dolt SQL shell, let's commit the changes we've made on the main branch and then checkout a new branch called data-import:

call dolt_commit('-Am', 'Creating customers table');
call dolt_checkout('-b', 'data-import');

At this point, our session is using the new data-import branch, so any changes we make here won't show up on main yet. By default, Dolt-to-MySQL replication replicates the main branch, but you can configure any branch you want. The important note is that only a single branch can be replicated via Dolt-to-MySQL replication because of the way MySQL's binlog protocol works.

Let's add a new column to the customers table and insert some data. To keep it simple, we'll add a single row, but you could imagine using this data-import branch to import a large number of records, get the data import reviewed by a teammate, and correct any data issues, before merging the data changes back to the main branch.

alter table customers add column address varchar(255);
insert into customers values (100, 'Cheshire Cat', 'Wonderland');
call dolt_commit('-am', 'Importing customers');

Before we merge these changes back to main, let's quickly jump over to the watch-topic container and verify that we haven't seen any new events reported. That's because we've been editing the data on the data-import branch, and we don't expect any data change events until we make changes on the main branch.

Okay, let's switch back to our main branch and take a look at the customers table:

call dolt_checkout('main');
select * from customers;
Empty set (0.00 sec)

There are no rows in that table, just like we expected. Now let's merge in the changes from our data-import branch and then see what the data looks like:

call dolt_merge('data-import');
select * from customers;
+-----+--------------+------------+
| pk  | name         | address    |
+-----+--------------+------------+
| 100 | Cheshire Cat | Wonderland |
+-----+--------------+------------+
1 row in set (0.01 sec)

There's our new data! When we jump back to the watch-topic container, we should see some more JSON events explaining how our data changed during the merge. Just as in the previous examples, the first JSON document identifies the key of the row that changed:

{
  "schema": ...
  "payload": {
    "pk": 100
  }
}

The second JSON document shows us the before and after values of that row. Note that we also made a schema change to the customers table on our data-import branch, and as expected, we see that reflected in the JSON output:

{
  "schema": ...
  "payload": {
    "before": null,
    "after": {
      "pk": 100,
      "name": "Cheshire Cat",
      "address": "Wonderland"
    },
    "source": {
      "version": "2.5.4.Final",
      "connector": "mysql",
      "name": "dbserver1",
      "ts_ms": 1721339900000,
      "snapshot": "false",
      "db": "inventory",
      "sequence": null,
      "table": "customers",
      "server_id": 1,
      "gtid": "5d9f7521-dbe6-46bd-a5dd-f5ae568c00ec:19",
      "file": "binlog-main.000001",
      "pos": 1230,
      "row": 0,
      "thread": 0,
      "query": null
    },
    "op": "c",
    "ts_ms": 1721340287408,
    "transaction": null
  }
}
Click to see the full JSON output of both documents
{
  "schema": {
    "type": "struct",
    "fields": [
      {
        "type": "int32",
        "optional": false,
        "field": "pk"
      }
    ],
    "optional": false,
    "name": "dbserver1.inventory.customers.Key"
  },
  "payload": {
    "pk": 100
  }
}
{
  "schema": {
    "type": "struct",
    "fields": [
      {
        "type": "struct",
        "fields": [
          {
            "type": "int32",
            "optional": false,
            "field": "pk"
          },
          {
            "type": "string",
            "optional": true,
            "field": "name"
          },
          {
            "type": "string",
            "optional": true,
            "field": "address"
          }
        ],
        "optional": true,
        "name": "dbserver1.inventory.customers.Value",
        "field": "before"
      },
      {
        "type": "struct",
        "fields": [
          {
            "type": "int32",
            "optional": false,
            "field": "pk"
          },
          {
            "type": "string",
            "optional": true,
            "field": "name"
          },
          {
            "type": "string",
            "optional": true,
            "field": "address"
          }
        ],
        "optional": true,
        "name": "dbserver1.inventory.customers.Value",
        "field": "after"
      },
      {
        "type": "struct",
        "fields": [
          {
            "type": "string",
            "optional": false,
            "field": "version"
          },
          {
            "type": "string",
            "optional": false,
            "field": "connector"
          },
          {
            "type": "string",
            "optional": false,
            "field": "name"
          },
          {
            "type": "int64",
            "optional": false,
            "field": "ts_ms"
          },
          {
            "type": "string",
            "optional": true,
            "name": "io.debezium.data.Enum",
            "version": 1,
            "parameters": {
              "allowed": "true,last,false,incremental"
            },
            "default": "false",
            "field": "snapshot"
          },
          {
            "type": "string",
            "optional": false,
            "field": "db"
          },
          {
            "type": "string",
            "optional": true,
            "field": "sequence"
          },
          {
            "type": "string",
            "optional": true,
            "field": "table"
          },
          {
            "type": "int64",
            "optional": false,
            "field": "server_id"
          },
          {
            "type": "string",
            "optional": true,
            "field": "gtid"
          },
          {
            "type": "string",
            "optional": false,
            "field": "file"
          },
          {
            "type": "int64",
            "optional": false,
            "field": "pos"
          },
          {
            "type": "int32",
            "optional": false,
            "field": "row"
          },
          {
            "type": "int64",
            "optional": true,
            "field": "thread"
          },
          {
            "type": "string",
            "optional": true,
            "field": "query"
          }
        ],
        "optional": false,
        "name": "io.debezium.connector.mysql.Source",
        "field": "source"
      },
      {
        "type": "string",
        "optional": false,
        "field": "op"
      },
      {
        "type": "int64",
        "optional": true,
        "field": "ts_ms"
      },
      {
        "type": "struct",
        "fields": [
          {
            "type": "string",
            "optional": false,
            "field": "id"
          },
          {
            "type": "int64",
            "optional": false,
            "field": "total_order"
          },
          {
            "type": "int64",
            "optional": false,
            "field": "data_collection_order"
          }
        ],
        "optional": true,
        "name": "event.block",
        "version": 1,
        "field": "transaction"
      }
    ],
    "optional": false,
    "name": "dbserver1.inventory.customers.Envelope",
    "version": 1
  },
  "payload": {
    "before": null,
    "after": {
      "pk": 100,
      "name": "Cheshire Cat",
      "address": "Wonderland"
    },
    "source": {
      "version": "2.5.4.Final",
      "connector": "mysql",
      "name": "dbserver1",
      "ts_ms": 1721339900000,
      "snapshot": "false",
      "db": "inventory",
      "sequence": null,
      "table": "customers",
      "server_id": 1,
      "gtid": "5d9f7521-dbe6-46bd-a5dd-f5ae568c00ec:19",
      "file": "binlog-main.000001",
      "pos": 1230,
      "row": 0,
      "thread": 0,
      "query": null
    },
    "op": "c",
    "ts_ms": 1721340287408,
    "transaction": null
  }
}

Conclusion

Debezium is a powerful tool for change data capture use cases. It allows you to hook up real-time data change notifications to various parts of your system, while still keeping your system loosely coupled. Now that Dolt supports Dolt-to-MySQL replication, via the MySQL binlog replication protocol, it's easy to set up Debezium to monitor the changes on a branch in your Dolt database, and notify other parts of your system when data changes. This can be used to communicate data changes to a central data platform, to power analytics processes, to implement compliance controls, and much, much more.

If you're interested in change data capture, database replication, or data versioning, we hope you'll join us on Discord to talk with our engineering team and other Dolt users.

SHARE

JOIN THE DATA EVOLUTION

Get started with Dolt

Or join our mailing list to get product updates.