Skip to content

Commit 179b2f5

Browse files
author
Hoang Phan
committed
Add support for binlog state
1 parent 288afce commit 179b2f5

File tree

6 files changed

+561
-46
lines changed

6 files changed

+561
-46
lines changed

python/sources/mysql_cdc/README.md

Lines changed: 126 additions & 3 deletions
Original file line numberDiff line numberDiff line change
@@ -1,6 +1,14 @@
11
# MySQL CDC
22

3-
This connector demonstrates how to capture changes to a MySQL database table (using CDC) and publish the change events to a Kafka topic using MySQL binary log replication.
3+
This connector demonstrates how to capture changes to a MySQL database table (using CDC) and publish the change events to a Kafka topic using MySQL binary log replication. It features **persistent binlog position tracking** to ensure exactly-once processing and automatic recovery after restarts.
4+
5+
## Key Features
6+
7+
- **Persistent Binlog Position**: Automatically saves and resumes from the last processed binlog position
8+
- **Exactly-Once Processing**: No data loss during application restarts or failures
9+
- **Initial Snapshot**: Optionally capture existing data before starting CDC
10+
- **Automatic Recovery**: Seamlessly resume processing after interruptions
11+
- **Change Buffering**: Batches changes for efficient Kafka publishing
412

513
## How to run
614

@@ -11,8 +19,7 @@ This connector demonstrates how to capture changes to a MySQL database table (us
1119

1220
## Environment variables
1321

14-
The connector uses the following environment variables:
15-
22+
### Required MySQL Connection
1623
- **output**: Name of the output topic to write into.
1724
- **MYSQL_HOST**: The IP address or fully qualified domain name of your MySQL server.
1825
- **MYSQL_PORT**: The Port number to use for communication with the server (default: 3306).
@@ -22,11 +29,127 @@ The connector uses the following environment variables:
2229
- **MYSQL_SCHEMA**: The name of the schema/database for CDC (same as MYSQL_DATABASE).
2330
- **MYSQL_TABLE**: The name of the table for CDC.
2431

32+
### Optional Configuration
33+
- **MYSQL_SNAPSHOT_HOST**: MySQL host for initial snapshot (defaults to MYSQL_HOST if not set). Use this if you want to perform initial snapshot from a different MySQL instance (e.g., read replica).
34+
- **INITIAL_SNAPSHOT**: Set to "true" to perform initial snapshot of existing data (default: false).
35+
- **SNAPSHOT_BATCH_SIZE**: Number of rows to process in each snapshot batch (default: 1000).
36+
- **FORCE_SNAPSHOT**: Set to "true" to force snapshot even if already completed (default: false).
37+
38+
### State Management
39+
- **Quix__State__Dir**: Directory for storing application state including binlog positions (default: "state").
40+
41+
## Binlog Position Persistence
42+
43+
The connector automatically tracks the MySQL binlog position and saves it to disk after successful Kafka delivery. This ensures:
44+
45+
- **No data loss** during application restarts
46+
- **Exactly-once processing** of database changes
47+
- **Automatic resumption** from the last processed position
48+
49+
Position files are stored in: `{STATE_DIR}/binlog_position_{schema}_{table}.json`
50+
51+
Example position file:
52+
```json
53+
{
54+
"log_file": "mysql-bin.000123",
55+
"log_pos": 45678,
56+
"timestamp": 1704067200.0,
57+
"readable_time": "2024-01-01 12:00:00 UTC"
58+
}
59+
```
60+
61+
## Initial Snapshot
62+
63+
Enable initial snapshot to capture existing table data before starting CDC:
64+
65+
```env
66+
INITIAL_SNAPSHOT=true
67+
SNAPSHOT_BATCH_SIZE=1000
68+
MYSQL_SNAPSHOT_HOST=replica.mysql.example.com # Optional: use read replica
69+
```
70+
71+
The initial snapshot:
72+
- Processes data in configurable batches to avoid memory issues
73+
- Sends snapshot records with `"kind": "snapshot_insert"` to distinguish from real inserts
74+
- Marks completion to avoid re-processing on restart
75+
- Can be forced to re-run with `FORCE_SNAPSHOT=true`
76+
2577
## Requirements / Prerequisites
2678

2779
- A MySQL Database with binary logging enabled.
2880
- Set `log-bin=mysql-bin` and `binlog-format=ROW` in MySQL configuration.
2981
- MySQL user with `REPLICATION SLAVE` and `REPLICATION CLIENT` privileges.
82+
- For initial snapshot: `SELECT` privilege on the target table.
83+
84+
### MySQL Configuration Example
85+
```ini
86+
[mysqld]
87+
server-id = 1
88+
log_bin = /var/log/mysql/mysql-bin.log
89+
binlog_expire_logs_seconds = 864000
90+
max_binlog_size = 100M
91+
binlog-format = ROW
92+
binlog_row_metadata = FULL
93+
binlog_row_image = FULL
94+
```
95+
96+
### MySQL User Permissions
97+
```sql
98+
-- Create replication user
99+
CREATE USER 'cdc_user'@'%' IDENTIFIED BY 'secure_password';
100+
101+
-- Grant replication privileges
102+
GRANT REPLICATION SLAVE, REPLICATION CLIENT ON *.* TO 'cdc_user'@'%';
103+
104+
-- Grant select for initial snapshot
105+
GRANT SELECT ON your_database.your_table TO 'cdc_user'@'%';
106+
107+
FLUSH PRIVILEGES;
108+
```
109+
110+
## Change Event Format
111+
112+
### INSERT/Snapshot Insert
113+
```json
114+
{
115+
"kind": "insert", // or "snapshot_insert" for initial snapshot
116+
"schema": "database_name",
117+
"table": "table_name",
118+
"columnnames": ["id", "name"],
119+
"columnvalues": [123, "John Doe"],
120+
"oldkeys": {}
121+
}
122+
```
123+
124+
### UPDATE
125+
```json
126+
{
127+
"kind": "update",
128+
"schema": "database_name",
129+
"table": "table_name",
130+
"columnnames": ["id", "name"],
131+
"columnvalues": [123, "Jane Doe"],
132+
"oldkeys": {
133+
"keynames": ["id", "name"],
134+
"keyvalues": [123, "John Doe"]
135+
}
136+
}
137+
```
138+
139+
### DELETE
140+
```json
141+
{
142+
"kind": "delete",
143+
"schema": "database_name",
144+
"table": "table_name",
145+
"columnnames": [],
146+
"columnvalues": [],
147+
"oldkeys": {
148+
"keynames": ["id", "name"],
149+
"keyvalues": [123, "Jane Doe"]
150+
}
151+
}
152+
```
30153

31154
## Contribute
32155

python/sources/mysql_cdc/README_MYSQL_CDC.md

Lines changed: 166 additions & 18 deletions
Original file line numberDiff line numberDiff line change
@@ -1,29 +1,50 @@
11
# MySQL CDC Setup
22

3-
This application implements MySQL CDC using MySQL binary log replication.
3+
This application implements MySQL CDC using MySQL binary log replication with **persistent binlog position tracking** for exactly-once processing and automatic recovery.
4+
5+
## Key Features
6+
7+
- **Persistent Binlog Position**: Automatically saves and resumes from the last processed binlog position
8+
- **Exactly-Once Processing**: No data loss during application restarts or failures
9+
- **Initial Snapshot**: Optionally capture existing data before starting CDC
10+
- **Automatic Recovery**: Seamlessly resume processing after interruptions
11+
- **Change Buffering**: Batches changes for efficient Kafka publishing
12+
- **State Management**: Integrated state persistence for production reliability
413

514
## Prerequisites
615

716
1. **MySQL Configuration**: Your MySQL server must have binary logging enabled with ROW format:
817
```ini
918
# Add to MySQL configuration file (my.cnf or my.ini)
10-
log-bin=mysql-bin
11-
binlog-format=ROW
12-
server-id=1
19+
[mysqld]
20+
server-id = 1
21+
log_bin = /var/log/mysql/mysql-bin.log
22+
binlog_expire_logs_seconds = 864000
23+
max_binlog_size = 100M
24+
binlog-format = ROW
25+
binlog_row_metadata = FULL
26+
binlog_row_image = FULL
1327
```
1428

1529
2. **MySQL User Permissions**: The MySQL user needs REPLICATION SLAVE and REPLICATION CLIENT privileges:
1630
```sql
17-
GRANT REPLICATION SLAVE, REPLICATION CLIENT ON *.* TO 'your_user'@'%';
18-
GRANT SELECT ON your_database.your_table TO 'your_user'@'%';
31+
-- Create replication user
32+
CREATE USER 'cdc_user'@'%' IDENTIFIED BY 'secure_password';
33+
34+
-- Grant replication privileges for CDC
35+
GRANT REPLICATION SLAVE, REPLICATION CLIENT ON *.* TO 'cdc_user'@'%';
36+
37+
-- Grant select for initial snapshot (if using snapshot feature)
38+
GRANT SELECT ON your_database.your_table TO 'cdc_user'@'%';
39+
1940
FLUSH PRIVILEGES;
2041
```
2142

2243
## Environment Variables
2344

2445
Set the following environment variables:
2546

26-
### MySQL Connection
47+
### Required MySQL Connection
2748
- `MYSQL_HOST` - MySQL server hostname (e.g., localhost)
2849
- `MYSQL_PORT` - MySQL server port (default: 3306)
2950
- `MYSQL_USER` - MySQL username
@@ -32,7 +53,16 @@ Set the following environment variables:
3253
- `MYSQL_SCHEMA` - MySQL database name (same as MYSQL_DATABASE)
3354
- `MYSQL_TABLE` - Table name to monitor for changes
3455

35-
### Kafka Output (unchanged)
56+
### Optional Configuration
57+
- `MYSQL_SNAPSHOT_HOST` - MySQL host for initial snapshot (defaults to MYSQL_HOST). Use this to snapshot from a read replica
58+
- `INITIAL_SNAPSHOT` - Set to "true" to perform initial snapshot (default: false)
59+
- `SNAPSHOT_BATCH_SIZE` - Rows per snapshot batch (default: 1000)
60+
- `FORCE_SNAPSHOT` - Set to "true" to force re-snapshot (default: false)
61+
62+
### State Management
63+
- `Quix__State__Dir` - Directory for storing state files (default: "state")
64+
65+
### Kafka Output
3666
- `output` - Kafka topic name for publishing changes
3767

3868
## Example .env file
@@ -41,16 +71,80 @@ Set the following environment variables:
4171
# MySQL Connection
4272
MYSQL_HOST=localhost
4373
MYSQL_PORT=3306
44-
MYSQL_USER=replication_user
45-
MYSQL_PASSWORD=your_password
74+
MYSQL_USER=cdc_user
75+
MYSQL_PASSWORD=secure_password
4676
MYSQL_DATABASE=your_database
4777
MYSQL_SCHEMA=your_database
4878
MYSQL_TABLE=your_table
4979
80+
# Optional: Use read replica for initial snapshot
81+
MYSQL_SNAPSHOT_HOST=replica.mysql.example.com
82+
83+
# Initial Snapshot Configuration
84+
INITIAL_SNAPSHOT=true
85+
SNAPSHOT_BATCH_SIZE=1000
86+
FORCE_SNAPSHOT=false
87+
88+
# State Management
89+
Quix__State__Dir=./state
90+
5091
# Kafka Output
5192
output=cdc-changes-topic
5293
```
5394

95+
## Binlog Position Persistence
96+
97+
The application automatically tracks MySQL binlog positions and persists them to disk:
98+
99+
### How it works:
100+
1. **Position Tracking**: Records current binlog file and position during processing
101+
2. **Automatic Saving**: Saves position after successful Kafka delivery
102+
3. **Recovery**: Automatically resumes from last saved position on restart
103+
4. **Exactly-Once**: Ensures no data loss or duplication
104+
105+
### Position Storage:
106+
- Location: `{STATE_DIR}/binlog_position_{schema}_{table}.json`
107+
- Format:
108+
```json
109+
{
110+
"log_file": "mysql-bin.000123",
111+
"log_pos": 45678,
112+
"timestamp": 1704067200.0,
113+
"readable_time": "2024-01-01 12:00:00 UTC"
114+
}
115+
```
116+
117+
### Benefits:
118+
-**No data loss** during application restarts
119+
-**Exactly-once processing** of database changes
120+
-**Automatic recovery** from last processed position
121+
-**Production-ready** state management
122+
123+
## Initial Snapshot
124+
125+
Capture existing table data before starting real-time CDC:
126+
127+
### Configuration:
128+
```env
129+
INITIAL_SNAPSHOT=true
130+
SNAPSHOT_BATCH_SIZE=1000
131+
MYSQL_SNAPSHOT_HOST=replica.mysql.example.com # Optional
132+
```
133+
134+
### Features:
135+
- **Batched Processing**: Configurable batch sizes to handle large tables
136+
- **Memory Efficient**: Processes data in chunks to avoid memory issues
137+
- **Read Replica Support**: Use `MYSQL_SNAPSHOT_HOST` to snapshot from replica
138+
- **Completion Tracking**: Marks snapshot completion to avoid re-processing
139+
- **Force Re-snapshot**: Use `FORCE_SNAPSHOT=true` to re-run if needed
140+
141+
### Snapshot Process:
142+
1. Connects to snapshot host (or main host if not specified)
143+
2. Processes table data in batches
144+
3. Sends records with `"kind": "snapshot_insert"`
145+
4. Marks completion in state file
146+
5. Proceeds to real-time CDC
147+
54148
## Dependencies
55149

56150
Install the required Python packages:
@@ -66,6 +160,18 @@ The key MySQL-specific dependencies are:
66160

67161
The MySQL CDC produces change events in the following format:
68162

163+
### Snapshot Insert Event
164+
```json
165+
{
166+
"kind": "snapshot_insert",
167+
"schema": "database_name",
168+
"table": "table_name",
169+
"columnnames": ["col1", "col2"],
170+
"columnvalues": ["value1", "value2"],
171+
"oldkeys": {}
172+
}
173+
```
174+
69175
### INSERT Event
70176
```json
71177
{
@@ -110,15 +216,57 @@ The MySQL CDC produces change events in the following format:
110216

111217
## Running the Application
112218

113-
1. Ensure MySQL is configured with binary logging
114-
2. Set environment variables
115-
3. Run the application:
219+
1. **Configure MySQL** with binary logging enabled
220+
2. **Set environment variables** (see example above)
221+
3. **Run the application**:
116222
```bash
117223
python main.py
118224
```
119225

120-
The application will:
121-
1. Connect to MySQL and validate binary logging is enabled
122-
2. Create a binary log stream reader
123-
3. Monitor the specified table for changes
124-
4. Buffer changes and publish them to Kafka every 500ms
226+
### Application Flow:
227+
1. **Load State**: Attempts to load saved binlog position
228+
2. **Initial Snapshot** (if enabled and not completed):
229+
- Connects to snapshot host
230+
- Processes existing data in batches
231+
- Sends snapshot events to Kafka
232+
- Marks completion
233+
3. **Real-time CDC**:
234+
- Connects to MySQL binlog stream
235+
- Resumes from saved position (or current if first run)
236+
- Monitors specified table for changes
237+
- Buffers changes and publishes to Kafka every 500ms
238+
- Saves binlog position after successful delivery
239+
4. **Recovery**: On restart, automatically resumes from last saved position
240+
241+
### Monitoring:
242+
- Check application logs for binlog position updates
243+
- Monitor state directory for position files
244+
- Verify Kafka topic for change events
245+
- Use MySQL's `SHOW MASTER STATUS` to compare positions
246+
247+
## Troubleshooting
248+
249+
### Common Issues:
250+
251+
1. **Binary logging not enabled**:
252+
- Error: "Binary logging must be enabled for CDC"
253+
- Solution: Enable binlog in MySQL configuration and restart
254+
255+
2. **Insufficient privileges**:
256+
- Error: Access denied
257+
- Solution: Grant REPLICATION SLAVE, REPLICATION CLIENT privileges
258+
259+
3. **Position file corruption**:
260+
- Delete position file to restart from current position
261+
- Location: `{STATE_DIR}/binlog_position_{schema}_{table}.json`
262+
263+
4. **Snapshot issues**:
264+
- Check `MYSQL_SNAPSHOT_HOST` connectivity
265+
- Verify SELECT privileges on target table
266+
- Review batch size for memory constraints
267+
268+
### Best Practices:
269+
- Use read replicas for initial snapshots on large tables
270+
- Monitor disk space for state directory
271+
- Set appropriate `SNAPSHOT_BATCH_SIZE` based on available memory
272+
- Regularly backup state files for disaster recovery

0 commit comments

Comments
 (0)