Using python-mysql-replication with Dolt

INTEGRATION
12 min read

Dolt is the world's first version controlled SQL database – you can branch/diff/fork/merge/rebase your data, in all the same ways that Git lets you work with source code files. Thanks to Dolt's custom storage engine, these operations to compute diffs and to merge branches are extremely fast, too. ⚡️ Dolt is fully compatible with MySQL, so you can use it as a drop-in replacement anywhere you're running a MySQL server. We've been highlighting this compatibility recently, and in today's post, we're taking a look at another MySQL tool that works with Dolt.

Python MySQL Replication

Lately, I've been writing about Dolt's support for Dolt-to-MySQL replication, using the MySQL binlog replication protocol. In addition to Dolt's native replication format that's optimized for Dolt-to-Dolt replication, we also support the MySQL binlog replication protocol for compatibility with MySQL replication. In 2023, we launched MySQL-to-Dolt replication, and this year, we launched Dolt-to-MySQL replication. That means a Dolt SQL server can now serve as either a source or a replica in a MySQL replication topology.

One great use case for Dolt-to-MySQL replication is to enable Change Data Capture tools to monitor data changes and alert other parts of your system when certain data changes. In our previous post on Dolt-to-MySQL replication, we showed how you can hook up Debezium to monitor a Dolt branch for data changes. Debezium is a great solution for change data capture systems, especially if you're already running Kafka for an event streaming platform. In this post, we're going to look a similar, but simpler tool – the Python MySQL Replication library, python-mysql-replication. This library works in a very similar way to Debezium, by connecting to a MySQL compatible server and consuming a stream of binlog events. The biggest difference with this library is that it doesn't transform the raw events and publish them to a Kafka topic; instead, you write Python code that directly processes the binlog events. Debezium is a great choice for a very robust and scalable solution, but for many applications that don't need that level of complexity, python-mysql-replication can be a great fit.

Demo

Let's jump right in and see the python-mysql-replication library in action. We'll start by setting up a Dolt SQL server, then we'll write a simple Python script to show the changes being made in our Dolt database, and finally we'll extend that script to do something useful when a specific data change occurs.

Dolt SQL Server

First things, first, we need a Dolt SQL server to connect to. We could create a new database here, but for this demo, we'll clone the existing dolthub/employees database from DoltHub, so that we've got some sample data to work with. This database has a few tables that represent a fake company's employees, their titles, and their salaries.

# Create a new directory to hold our cloned Dolt database and the Python script we'll write later 
mkdir pythonMysqlReplication && cd pythonMysqlReplication

# Clone the dolthub/employees database 
dolt clone dolthub/employees
cd employees

Now that we've got a Dolt database to work with, let's set a few options to enable binlog replication support before we start up the server. This options must be set before the Dolt SQL server starts up, otherwise binlog replication won't be enabled, so we'll use dolt sql to persist a few system variables. Setting @@log_bin is what enables data changes to be recorded in binary log events, and @@gtid_mode and @@enforce_gtid_consistency are required for replication with GTID position (the only positioning mode supported by Dolt).

dolt sql -q "SET @@PERSIST.log_bin=ON;"
dolt sql -q "SET @@PERSIST.gtid_mode=ON;"
dolt sql -q "SET @@PERSIST.enforce_gtid_consistency=ON;"

After setting those options to enable binlog replication, we need to set one more option for the python-mysql-replication library to work with Dolt. The library requires that the @@binlog_row_metadata system variable to FULL (the default is MINIMAL). Although Dolt doesn't fully support sending full row metadata, we can set this system variable to FULL to keep the library happy. This limitation means that we won't be able to look up row values by column name, but we can still reference them by their ordinal position, as we'll see later. If you hit issues with compatibility here, please send us a GitHub issue to request full support for @@binlog_row_metadata and we'll be happy to dig into the details.

dolt sql -q "SET @@PERSIST.binlog_row_metadata=FULL;"

Now that we've configured our replication options through those system variables, we're ready to start up the Dolt SQL server:

dolt sql-server --loglevel DEBUG 

The dolt sql-server process will stay in the foreground in our terminal and print out any log messages. You should see a message like Enabling binary logging if you got the system variables set up correctly. Leave this running here, and let's go open a new terminal window, then use the mysql command line client to connect to our running database server.

mysql -uroot --protocol TCP employees

Once you're logged into the Dolt SQL server, you can view all the tables and run some queries to take a look at what's in this database.

mysql> show tables;
+----------------------+
| Tables_in_employees  |
+----------------------+
| current_dept_emp     |
| departments          |
| dept_emp             |
| dept_emp_latest_date |
| dept_manager         |
| employees            |
| salaries             |
| titles               |
+----------------------+
8 rows in set (0.00 sec)

mysql> select * from employees limit 5;
+--------+------------+------------+-----------+--------+------------+
| emp_no | birth_date | first_name | last_name | gender | hire_date  |
+--------+------------+------------+-----------+--------+------------+
|  10001 | 1953-09-02 | Georgi     | Facello   | M      | 1986-06-26 |
|  10002 | 1964-06-02 | Bezalel    | Simmel    | F      | 1985-11-21 |
|  10003 | 1959-12-03 | Parto      | Bamford   | M      | 1986-08-28 |
|  10004 | 1954-05-01 | Chirstian  | Koblick   | M      | 1986-12-01 |
|  10005 | 1955-01-21 | Kyoichi    | Maliniak  | M      | 1989-09-12 |
+--------+------------+------------+-----------+--------+------------+
5 rows in set (0.00 sec)

Finally, before we head to a new terminal window, let's grab the server's UUID. We'll need this later to pass in to the python-mysql-replication library.

mysql> select @@server_uuid;
+--------------------------------------+
| @@server_uuid                        |
+--------------------------------------+
| 2cd4d78c-65ac-492d-bd15-7a063529cbdb |
+--------------------------------------+
1 row in set (0.00 sec)

Hello python-mysql-replication World!

The python-mysql-replication library comes with an example dump_events.py script that simply prints out the events it's capturing. I started with this example and made a few small tweaks to make it work for Dolt. You can download the Gist from GitHub, or just copy the code below into a dump_events.py file.

from pymysqlreplication import BinLogStreamReader
from pymysqlreplication.event import HeartbeatLogEvent

mysql_settings = {'host': '127.0.0.1', 'port': 3306, 'user': 'root', 'passwd': ''}

stream = BinLogStreamReader(connection_settings=mysql_settings, blocking=True, server_id=100, auto_position='2cd4d78c-65ac-492d-bd15-7a063529cbdb:1')

for binlogevent in stream:
    if not isinstance(binlogevent, HeartbeatLogEvent):
        binlogevent.dump()

stream.close()

Before you can run this script, you'll need to adjust the auto_position parameter, but first, let's explain each of the options we're passing to BinLogStreamReader:

  • blocking=True – this option tells the library to use normal, or blocking replication, instead of the non-blocking replication mode. You must specify this option when working with Dolt, since Dolt does not currently support MySQL's non-blocking replication mode. If you don't specify it, python-mysql-replication will default to blocking=False and you won't receive events from the Dolt server.
  • server_id=100 – every node in a MySQL replication topology must use a unique server ID. The value doesn't really matter, as long as it's unique, so choose your favorite integer here.
  • auto_position='5d9f7521-dbe6-46bd-a5dd-f5ae568c00ec:1' – this is the GTID position that the library will send to the Dolt server so that the Dolt server knows what events need to be streamed to the library. python-mysql-replication needs this parameter to be specified in order to use GTID auto-positioning (the only replication positioning mode that Dolt supports). You must change this for your Dolt server's unique @@server_uuid! Take the result of the select @@server_uuid; query you ran against the Dolt SQL server in the last step, add ":1" at the end to signal the very first GTID on that server, and use that as the value for the auto_position parameter.

The only other difference between our script and the example from the python-mysql-replication library is that we're filtering out HeartbeatLogEvent events. These events are sent by the MySQL server to ensure that the connection is still alive, and we don't need to process them in our script, so we just ignore them.

Let's try running our script...

python dump_events.py

=== RotateEvent ===
Position: 4
Next binlog file: binlog-main.000002

=== FormatDescriptionEvent ===
Date: 2024-08-05T19:52:12
Log position: 120
Event size: 93
Read bytes: 73
Binlog version: (4,)
mysql version: 5.6.33-0ubuntu0.14.04.1-log
Created: 1722887532
Common header length: 19
Post header length: (56, 13, 0, 8, 0, 18, 0, 4, 4, 4, 4, 18)
Server version split: (0, 0, 92)
Number of event types: 0

=== PreviousGtidsEvent ===
Date: 2024-08-05T19:52:12
Log position: 151
Event size: 8
Read bytes: 8
previous_gtids: 

If the script connected correctly, then you should see a few events printed out immediately. Every MySQL binlog event stream starts off with a RotateEvent to let the connected replica know what file is being served, and then a FormatDescriptionEvent to tell the connected replica how the binlog events are formatted, and then finally a PreviousGtidsEvent that tells the connected replica where the GTID position is.

At this point, our script is waiting for more events to come over the connection. Let's jump back over to the terminal window with our SQL shell open to our Dolt SQL server. We'll make some changes and see if they show up in the output of our Python program.

CREATE TABLE newTable(pk int primary key, col1 varchar(200));
INSERT INTO newTable VALUES (1, "hello world!");

Back in our terminal window with our running Python process, we see several more events generated from the two statements we executed above:

=== GtidEvent ===
Date: 2024-08-05T23:06:01
Log position: 239
Event size: 25
Read bytes: 26
Commit: True
GTID_NEXT: 2cd4d78c-65ac-492d-bd15-7a063529cbdb:9

=== QueryEvent ===
Date: 2024-08-05T23:06:01
Log position: 441
Event size: 179
Read bytes: 179
Schema: b'employees'
Execution time: 0
Query: CREATE TABLE `newTable` (
  `pk` int NOT NULL,
  `col1` varchar(200),
  PRIMARY KEY (`pk`)
) ENGINE=InnoDB DEFAULT CHARSET=utf8mb4 COLLATE=utf8mb4_0900_bin;

=== GtidEvent ===
Date: 2024-08-05T23:06:01
Log position: 489
Event size: 25
Read bytes: 26
Commit: True
GTID_NEXT: 2cd4d78c-65ac-492d-bd15-7a063529cbdb:10

=== QueryEvent ===
Date: 2024-08-05T23:06:01
Log position: 540
Event size: 28
Read bytes: 28
Schema: b'employees'
Execution time: 0
Query: BEGIN

=== TableMapEvent ===
Date: 2024-08-05T23:06:01
Log position: 599
Event size: 36
Read bytes: 36
Table id: 3
Schema: employees
Table: newTable
Columns: 2
=== OptionalMetaData ===
unsigned_column_list: []
default_charset_collation: None
charset_collation: {}
column_charset: []
column_name_list: []
set_str_value_list : []
set_enum_str_value_list : []
geometry_type_list : []
simple_primary_key_list: []
primary_keys_with_prefix: {}
visibility_list: []
charset_collation_list: []
enum_and_set_collation_list: []

=== WriteRowsEvent ===
Date: 2024-08-05T23:06:01
Log position: 653
Event size: 31
Read bytes: 12
Table: employees.newTable
Affected columns: 2
Changed rows: 1
Column Name Information Flag: False
Values:
--
* UNKNOWN_COL0 : 1
* UNKNOWN_COL1 : hello world!

=== XidEvent ===
Date: 2024-08-05T23:06:01
Log position: 684
Event size: 8
Read bytes: 8
Transaction ID: 0

Each SQL transaction is preceded by a GtidEvent that marks the transaction boundary. The DDL CREATE TABLE statement is represented by a QueryEvent and the insert into the table is represented by a TableMapEvent that describes the table being updated and a WriteRowsEvent that includes the data being inserted. The final XidEvent marks the end of the DML transaction.

This is the basic structure of how the python-mysql-replication library works. It exposes the raw binlog events for you to process in your Python code. As the Dolt database changes, your Python app receives binlog events describing those changes. The example so far only used some simple SQL statements, but we can also use all of Dolt's version control functionality and also receive binlog events describing how those change the database's data. For example, the changes we've made so far are still in our current working set, because we haven't created a Dolt commit to add them to the database's commit graph. We can add them to the commit graph by running:

call dolt_commit('-Am', 'Adding a new table');

From the SQL shell, we can view the commit graph by looking at the dolt_log system table:

select * from dolt_log;
+----------------------------------+-----------+-----------------+---------------------+----------------------------+
| commit_hash                      | committer | email           | date                | message                    |
+----------------------------------+-----------+-----------------+---------------------+----------------------------+
| 66snjo19cfinmqnf73okdlq8m1t3b7nn | root      | root@%          | 2024-08-06 18:11:55 | Adding a new table         |
| a3p5ulod2rerno98191lhqnuhhfgppno | timsehn   | tim@dolthub.com | 2023-11-27 23:28:41 | imported data              |
| oi4sc8120pevl344fv60b865s2p73169 | timsehn   | tim@dolthub.com | 2023-11-27 23:15:25 | Initialize data repository |
+----------------------------------+-----------+-----------------+---------------------+----------------------------+
3 rows in set (0.00 sec)

If we want to clear out those changes in the most recent commit, we can use the dolt_reset() stored procedure to reset back to the any other commit:

call dolt_reset('--hard', 'HEAD~1');

Back in the terminal window with our Python script running, we see a bunch more binlog events printed out that undo creating the newTable table:

=== GtidEvent ===
Date: 2024-08-06T16:47:52
Log position: 1301
Event size: 25
Read bytes: 26
Commit: True
GTID_NEXT: 7c53ef27-9e8b-41c0-af2a-073dd9afcc6b:6

=== QueryEvent ===
Date: 2024-08-06T16:47:52
Log position: 1369
Event size: 45
Read bytes: 45
Schema: b'employees'
Execution time: 0
Query: DROP TABLE `newTable`;

Now that you know the basics of how the python-mysql-replication library works, let's see how we can modify our script to do something a little more useful.

Monitoring for New Hires

Okay, so now that we've got a simple working example with the python-mysql-replication library, let's take this one step further and do something useful. Let's say that whenever a new employee is added to the database, we want to trigger a notification to our IT team to get the new hire's laptop and email account set up. We can do this by changing our script to monitor for WriteRowsEvent binlog events to the employees table and then trigger some sort of custom action to notify the IT team, such as sending them a Slack message or pushing a message into a queue.

Grab the source code below and copy it into a file called monitor_new_hires.py, or download it from this GitHub Gist.

from pymysqlreplication import BinLogStreamReader
from pymysqlreplication.row_event import WriteRowsEvent

mysql_settings = {'host': '127.0.0.1', 'port': 3306, 'user': 'root', 'passwd': ''}

stream = BinLogStreamReader(connection_settings=mysql_settings, blocking=True, server_id=100, auto_position='2cd4d78c-65ac-492d-bd15-7a063529cbdb:1')

for binlogevent in stream:
    if isinstance(binlogevent, WriteRowsEvent) and binlogevent.table == "employees":
        for row in binlogevent.rows:
            # NOTE: Because Dolt doesn't send the full row metadata, we don't have column names available to us here,                                                                                                                     
            #       so instead, we can access the columns using column indexes instead.                                                                                                                                                   
            first_name = row["values"]["UNKNOWN_COL2"]
            last_name  = row["values"]["UNKNOWN_COL3"]
            start_date = row["values"]["UNKNOWN_COL5"]
            print(f"Detected new hire ({last_name}, {first_name}) starting on {start_date}")
            # TODO: From here, you could integrate with HR's systems to let them know about the new hire                                                                                                                                  
            #       For example, you could send a message to a slack channel, or you could send a message                                                                                                                                 
            #       to a queue that another system monitors.                                                                                                                                                                              

stream.close()

Don't forget to swap out the value for auto_position with the correct @@server_uuid for your running Dolt server! You can grab this value by querying the Dolt server with the select @@server_uuid; statement.

Then go ahead and run the script.

python monitor_new_hires.py

Now let's go back to our SQL shell and insert a new employee into the database:

insert into employees values (1000001, "1981-02-16", "Cooper", "McBear", "M", "2020-02-20");

And back on the terminal where we're running our Python script, you should see the output:

Detected new hire (McBear, Cooper) starting on 2020-02-20

And there you have it! With just a few lines of Python code, we have an application that is monitoring our Dolt database and taking action based on the type of data changes it sees happening. You could imagine extending this to support all sorts of different actions on data changes, such as deprovisioning accounts when employees leave the company, or sending alerts if a department has reached their hiring budget for the year.

Conclusion

The python-mysql-replication library is a great way to capture changes from a MySQL compatible database, like Dolt. It's a lightweight tool that lets you easily connect to a stream of binlog events and write custom Python code to take action on the changes. We saw how easy it was to connect this library to a Dolt SQL server and how to process the raw binlog events that it captures and exposes to your Python application. We also saw that using Dolt's version control features that change data, like dolt_reset() or dolt_merge(), generate the same binlog events as any other data changes.

If you're interested in change data capture, database replication, or data versioning, we hope you'll join us on Discord to talk with our engineering team and other Dolt users.

SHARE

JOIN THE DATA EVOLUTION

Get started with Dolt

Or join our mailing list to get product updates.