From 5465d06aa57463323e722363b1381dc7262fabad Mon Sep 17 00:00:00 2001 From: Tim Sawicki Date: Mon, 30 Jun 2025 22:17:44 -0400 Subject: [PATCH 1/2] add mysql_cdc source --- conda/post-link.sh | 4 +- docs/connectors/sources/mysql-cdc-source.md | 349 ++++++++++++++++++ pyproject.toml | 5 +- .../sources/community/mysql_cdc/__init__.py | 1 + .../sources/community/mysql_cdc/mysql_cdc.py | 251 +++++++++++++ .../community/mysql_cdc/mysql_helper.py | 325 ++++++++++++++++ .../community/mysql_cdc/setup_logger.py | 13 + 7 files changed, 946 insertions(+), 2 deletions(-) create mode 100644 docs/connectors/sources/mysql-cdc-source.md create mode 100644 quixstreams/sources/community/mysql_cdc/__init__.py create mode 100644 quixstreams/sources/community/mysql_cdc/mysql_cdc.py create mode 100644 quixstreams/sources/community/mysql_cdc/mysql_helper.py create mode 100644 quixstreams/sources/community/mysql_cdc/setup_logger.py diff --git a/conda/post-link.sh b/conda/post-link.sh index 9d19e640d..e51756ac8 100644 --- a/conda/post-link.sh +++ b/conda/post-link.sh @@ -8,4 +8,6 @@ $PREFIX/bin/pip install \ 'redis[hiredis]>=5.2.0,<6' \ 'confluent-kafka[avro,json,protobuf,schemaregistry]>=2.8.2,<2.10' \ 'influxdb>=5.3,<6' \ -'jsonpath_ng>=1.7.0,<2' +'jsonpath_ng>=1.7.0,<2' \ +'pymysql>=1.0,<2' \ +'mysql-replication>=1.1,<2' diff --git a/docs/connectors/sources/mysql-cdc-source.md b/docs/connectors/sources/mysql-cdc-source.md new file mode 100644 index 000000000..8a640bd68 --- /dev/null +++ b/docs/connectors/sources/mysql-cdc-source.md @@ -0,0 +1,349 @@ +# MySQL CDC Setup + +This application implements MySQL CDC using MySQL binary log replication with **Quix Streams StatefulSource** for exactly-once processing and automatic recovery. + +## Key Features + +- **Quix Streams StatefulSource**: Built on Quix Streams' robust stateful source framework +- **Automatic State Management**: Integrated state store for binlog position and snapshot tracking +- **Exactly-Once Processing**: No data loss during application restarts or failures +- **Initial Snapshot**: Optionally capture existing data before starting CDC +- **Automatic Recovery**: Seamlessly resume processing after interruptions +- **Change Buffering**: Batches changes for efficient Kafka publishing +- **Built-in Reliability**: Leverages Quix Streams' production-ready state management + +## Prerequisites + +1. MySQL version <=8.0 + +2. **MySQL Configuration**: Your MySQL server must have binary logging enabled with ROW format: + ```ini + # Add to MySQL configuration file (my.cnf or my.ini) + [mysqld] + server-id = 1 + log_bin = /var/log/mysql/mysql-bin.log + binlog_expire_logs_seconds = 864000 + max_binlog_size = 100M + binlog-format = ROW + binlog_row_metadata = FULL + binlog_row_image = FULL + ``` + +3. **MySQL User Permissions**: The MySQL user needs REPLICATION SLAVE and REPLICATION CLIENT privileges: + ```sql + -- Create replication user + CREATE USER 'cdc_user'@'%' IDENTIFIED BY 'secure_password'; + + -- Grant replication privileges for CDC + GRANT REPLICATION SLAVE, REPLICATION CLIENT ON *.* TO 'cdc_user'@'%'; + + -- Grant select for initial snapshot (if using snapshot feature) + GRANT SELECT ON your_database.your_table TO 'cdc_user'@'%'; + + FLUSH PRIVILEGES; + ``` + +## Environment Variables + +Set the following environment variables: + +### Required MySQL Connection +- `MYSQL_HOST` - MySQL server hostname (e.g., localhost) +- `MYSQL_PORT` - MySQL server port (default: 3306) +- `MYSQL_USER` - MySQL username +- `MYSQL_PASSWORD` - MySQL password +- `MYSQL_DATABASE` - MySQL database name +- `MYSQL_SCHEMA` - MySQL database name (same as MYSQL_DATABASE) +- `MYSQL_TABLE` - Table name to monitor for changes + +### Optional Configuration +- `MYSQL_SNAPSHOT_HOST` - MySQL host for initial snapshot (defaults to MYSQL_HOST). Use this to snapshot from a read replica +- `INITIAL_SNAPSHOT` - Set to "true" to perform initial snapshot (default: false) +- `SNAPSHOT_BATCH_SIZE` - Rows per snapshot batch (default: 1000) +- `FORCE_SNAPSHOT` - Set to "true" to force re-snapshot (default: false) + +### Kafka Output +- `output` - Kafka topic name for publishing changes + +## Example .env file + +```env +# MySQL Connection +MYSQL_HOST=localhost +MYSQL_PORT=3306 +MYSQL_USER=cdc_user +MYSQL_PASSWORD=secure_password +MYSQL_DATABASE=your_database +MYSQL_SCHEMA=your_database +MYSQL_TABLE=your_table + +# Optional: Use read replica for initial snapshot +MYSQL_SNAPSHOT_HOST=replica.mysql.example.com + +# Initial Snapshot Configuration +INITIAL_SNAPSHOT=true +SNAPSHOT_BATCH_SIZE=1000 +FORCE_SNAPSHOT=false + +# Kafka Output +output=cdc-changes-topic +``` + +## Quix Streams StatefulSource Architecture + +The application uses Quix Streams' `StatefulSource` class which provides: + +### Built-in State Management: +- **Automatic Persistence**: State is automatically saved to the configured state store +- **Exactly-Once Guarantees**: Built-in mechanisms ensure no data loss or duplication +- **Transactional Processing**: State changes are committed atomically with message production +- **Fault Tolerance**: Automatic recovery from failures with consistent state + +### State Storage: +The StatefulSource manages two types of state: +1. **Binlog Position**: `binlog_position_{schema}_{table}` + ```json + { + "log_file": "mysql-bin.000123", + "log_pos": 45678, + "timestamp": 1704067200.0 + } + ``` + +2. **Snapshot Completion**: `snapshot_completed_{schema}_{table}` + ```json + { + "completed_at": 1704067200.0, + "schema": "database_name", + "table": "table_name", + "timestamp": "2024-01-01 12:00:00 UTC" + } + ``` + +### Benefits: +- ✅ **Production-Ready**: Built on Quix Streams' proven architecture +- ✅ **No Manual State Management**: Automatic state persistence and recovery +- ✅ **Exactly-Once Processing**: Guaranteed delivery semantics +- ✅ **Simplified Operations**: Reduced complexity compared to manual state management +- ✅ **Scalable**: Can be easily deployed and scaled in production environments + +## Initial Snapshot + +Capture existing table data before starting real-time CDC: + +### Configuration: +```env +INITIAL_SNAPSHOT=true +SNAPSHOT_BATCH_SIZE=1000 +MYSQL_SNAPSHOT_HOST=replica.mysql.example.com # Optional +``` + +### Features: +- **Batched Processing**: Configurable batch sizes to handle large tables +- **Memory Efficient**: Processes data in chunks to avoid memory issues +- **Read Replica Support**: Use `MYSQL_SNAPSHOT_HOST` to snapshot from replica +- **Completion Tracking**: Marks snapshot completion in StatefulSource state store +- **Force Re-snapshot**: Use `FORCE_SNAPSHOT=true` to re-run if needed + +### Snapshot Process: +1. Connects to snapshot host (or main host if not specified) +2. Processes table data in batches +3. Sends records with `"kind": "snapshot_insert"` +4. Marks completion in StatefulSource state store +5. Proceeds to real-time CDC + +## Dependencies + +Install the required Python packages: +```bash +pip install -r requirements.txt +``` + +The key dependencies are: +- `quixstreams` - Quix Streams library with StatefulSource support +- `pymysql` - MySQL database connector +- `mysql-replication` - MySQL binary log replication library + +## Change Data Format + +The MySQL CDC produces change events in the following format: + +### Snapshot Insert Event +```json +{ + "kind": "snapshot_insert", + "schema": "database_name", + "table": "table_name", + "columnnames": ["col1", "col2"], + "columnvalues": ["value1", "value2"], + "oldkeys": {} +} +``` + +### INSERT Event +```json +{ + "kind": "insert", + "schema": "database_name", + "table": "table_name", + "columnnames": ["col1", "col2"], + "columnvalues": ["value1", "value2"], + "oldkeys": {} +} +``` + +### UPDATE Event +```json +{ + "kind": "update", + "schema": "database_name", + "table": "table_name", + "columnnames": ["col1", "col2"], + "columnvalues": ["new_value1", "new_value2"], + "oldkeys": { + "keynames": ["col1", "col2"], + "keyvalues": ["old_value1", "old_value2"] + } +} +``` + +### DELETE Event +```json +{ + "kind": "delete", + "schema": "database_name", + "table": "table_name", + "columnnames": [], + "columnvalues": [], + "oldkeys": { + "keynames": ["col1", "col2"], + "keyvalues": ["deleted_value1", "deleted_value2"] + } +} +``` + +## Running the Application + +1. **Configure MySQL** with binary logging enabled +2. **Set environment variables** (see example above) +3. **Run the application**: + ```bash + python main.py + ``` + +### Application Flow: +1. **StatefulSource Initialization**: Quix Streams creates the MySQL CDC source +2. **State Recovery**: Automatically loads saved binlog position and snapshot status +3. **Initial Snapshot** (if enabled and not completed): + - Connects to snapshot host + - Processes existing data in batches + - Sends snapshot events to Kafka + - Marks completion in state store +4. **Real-time CDC**: + - Connects to MySQL binlog stream + - Resumes from saved position (or current if first run) + - Monitors specified table for changes + - Buffers changes and publishes to Kafka every 500ms + - Automatically commits state after successful delivery +5. **Automatic Recovery**: On restart, StatefulSource handles state recovery + +### Monitoring: +- Check application logs for binlog position updates +- Monitor Quix Streams state store for position and snapshot data +- Verify Kafka topic for change events +- Use MySQL's `SHOW MASTER STATUS` to compare positions + +## Troubleshooting + +### Common Issues: + +1. **Binary logging not enabled**: + - Error: "Binary logging must be enabled for CDC" + - Solution: Enable binlog in MySQL configuration and restart + +2. **Insufficient privileges**: + - Error: Access denied + - Solution: Grant REPLICATION SLAVE, REPLICATION CLIENT privileges + +3. **StatefulSource state issues**: + - StatefulSource automatically handles state recovery + - Check Quix Streams configuration and state store connectivity + - Review application logs for state-related errors + +4. **Snapshot issues**: + - Check `MYSQL_SNAPSHOT_HOST` connectivity + - Verify SELECT privileges on target table + - Review batch size for memory constraints + +### Best Practices: +- Use read replicas for initial snapshots on large tables +- Configure appropriate Quix Streams state store settings +- Set appropriate `SNAPSHOT_BATCH_SIZE` based on available memory +- Monitor Quix Streams metrics for source performance +- Ensure proper Kafka connectivity for reliable message delivery + + +## Testing Locally + +You can test your application using a locally emulated MySQL host via Docker +with all correct settings by: + +1. Execute the following in terminal (just copy+paste) to run MySQL with the +correct settings and set of test credentials: + +```bash +TMPDIR=$(mktemp -d $HOME/mysql-cdc.XXXXXX) + +cat > "$TMPDIR/custom-mysql.cnf" < "$TMPDIR/init-user.sql" <=4.11,<5", "pandas>=1.0.0,<3.0", "elasticsearch>=8.17,<9", - "influxdb>=5.3,<6" + "influxdb>=5.3,<6", + "pymysql>=1.0,<2", + "mysql-replication>=1.1,<2", ] avro = ["fastavro>=1.8,<2.0"] @@ -61,6 +63,7 @@ neo4j = ["neo4j>=5.27.0,<6"] mongodb = ["pymongo>=4.11,<5"] pandas = ["pandas>=1.0.0,<3.0"] elasticsearch = ["elasticsearch>=8.17,<9"] +mysql = ["pymysql>=1.0,<2", "mysql-replication>=1.1,<2"] # AWS dependencies are separated by service to support # different requirements in the future. diff --git a/quixstreams/sources/community/mysql_cdc/__init__.py b/quixstreams/sources/community/mysql_cdc/__init__.py new file mode 100644 index 000000000..40a5b85b2 --- /dev/null +++ b/quixstreams/sources/community/mysql_cdc/__init__.py @@ -0,0 +1 @@ +from .mysql_cdc import * # noqa: F403 diff --git a/quixstreams/sources/community/mysql_cdc/mysql_cdc.py b/quixstreams/sources/community/mysql_cdc/mysql_cdc.py new file mode 100644 index 000000000..54ece3bd5 --- /dev/null +++ b/quixstreams/sources/community/mysql_cdc/mysql_cdc.py @@ -0,0 +1,251 @@ +import logging +import time +from typing import Optional + +from quixstreams.sources.base import StatefulSource + +from .mysql_helper import MySqlHelper + +__all__ = ("MySqlCdcSource",) + +logger = logging.getLogger(__name__) + + +class MySqlCdcSource(StatefulSource): + def __init__( + self, + host: str, + port: int, + user: str, + password: str, + database: str, + table: str, + state_dir: str = "state", + initial_snapshot: bool = False, + snapshot_host: Optional[str] = None, + snapshot_batch_size: int = 1000, + force_snapshot: bool = False, + name: str = "mysql-cdc-source", + ): + super().__init__(name=name) + + self._database = database + self._table = table + self._table_name = f"{self._database}.{self._table}" + self.wait_interval = 0.1 + self._helper = MySqlHelper( + **{ + "host": host, + "port": port, + "user": user, + "password": password, + "database": database, + "table": table, + "snapshot_host": snapshot_host or host, + "state_dir": state_dir, + } + ) + + self.initial_snapshot = initial_snapshot + self.snapshot_batch_size = snapshot_batch_size + self.force_snapshot = force_snapshot + + # Connection objects - will be initialized in setup() + self.conn = None + self.binlog_stream = None + + # Message buffering + self.buffer = [] + self.last_flush_time = time.time() + self.flush_interval = 0.5 # 500ms + + def setup(self): + """Initialize MySQL connection and CDC setup""" + try: + self._helper.enable_binlog_if_needed() + self._helper.setup_mysql_cdc() + self.conn = self._helper.connect_mysql() + self.binlog_stream = self._helper.create_binlog_stream() + logger.info("MySQL CDC CONNECTED!") + except Exception as e: + logger.error(f"ERROR during MySQL CDC setup - {e}") + raise + + def is_snapshot_completed(self): + """Check if initial snapshot has been completed using state store""" + snapshot_key = f"snapshot_completed_{self._database}_{self._table}" + return self.state.get(snapshot_key, False) and not self.force_snapshot + + def mark_snapshot_completed(self): + """Mark initial snapshot as completed in state store""" + snapshot_key = f"snapshot_completed_{self._database}_{self._table}" + snapshot_info = { + "completed_at": time.time(), + "schema": self._database, + "table": self._table, + "timestamp": time.strftime("%Y-%m-%d %H:%M:%S UTC", time.gmtime()), + } + self.state.set(snapshot_key, True) + self.state.set(f"snapshot_info_{self._database}_{self._table}", snapshot_info) + logger.info(f"Snapshot completion marked in state store for {self._table_name}") + + def get_snapshot_info(self): + """Get information about when snapshot was completed""" + info_key = f"snapshot_info_{self._database}_{self._table}" + return self.state.get(info_key, None) + + def save_binlog_position(self, log_file, log_pos): + """Save binlog position to state store""" + binlog_key = f"binlog_position_{self._database}_{self._table}" + position_info = { + "log_file": log_file, + "log_pos": log_pos, + "timestamp": time.time(), + } + self.state.set(binlog_key, position_info) + + def get_binlog_position(self): + """Get saved binlog position from state store""" + binlog_key = f"binlog_position_{self._database}_{self._table}" + return self.state.get(binlog_key, None) + + def perform_initial_snapshot_if_needed(self): + """Perform initial snapshot if enabled and not already completed""" + if not self.initial_snapshot: + logger.info("Initial snapshot is disabled - starting CDC stream only") + return + + if self.is_snapshot_completed(): + snapshot_info = self.get_snapshot_info() + if self.force_snapshot: + logger.info( + "Initial snapshot already completed but FORCE_SNAPSHOT=true - performing snapshot again..." + ) + else: + logger.info( + f"Initial snapshot already completed at {snapshot_info.get('timestamp', 'unknown time')} - skipping" + ) + return + else: + logger.info( + "Initial snapshot is enabled and not yet completed - performing snapshot..." + ) + + if not self.is_snapshot_completed() or self.force_snapshot: + try: + snapshot_changes = self._helper.perform_initial_snapshot( + self.snapshot_batch_size + ) + + # Send snapshot data to Kafka immediately + for change in snapshot_changes: + msg = self.serialize(key=self._table_name, value=change) + self.produce( + key=msg.key, + value=msg.value, + ) + + # Flush to ensure all snapshot data is sent and commit state + self.flush() + logger.info( + f"Initial snapshot completed - {len(snapshot_changes)} records sent to Kafka" + ) + + # Mark snapshot as completed + self.mark_snapshot_completed() + # Flush again to save the snapshot completion state + self.flush() + + except Exception as e: + logger.error(f"Failed to perform initial snapshot: {e}") + raise + + def process_buffered_messages(self): + """Process and send buffered messages if flush interval has passed""" + current_time = time.time() + + if (current_time - self.last_flush_time) >= self.flush_interval and len( + self.buffer + ) > 0: + logger.debug(f"Processing {len(self.buffer)} buffered messages") + + # Send all buffered messages + for message in self.buffer: + msg = self.serialize(key=self._table_name, value=message) + self.produce( + key=msg.key, + value=msg.value, + ) + + # Save binlog position if available + if hasattr(self.binlog_stream, "log_file") and hasattr( + self.binlog_stream, "log_pos" + ): + self.save_binlog_position( + self.binlog_stream.log_file, self.binlog_stream.log_pos + ) + + # Flush the producer and commit state changes + self.flush() + + # Clear the buffer and update flush time + self.buffer = [] + self.last_flush_time = current_time + + logger.debug("Buffered messages sent and state committed") + + def run(self): + """Main CDC loop - runs while self.running is True""" + logger.info(f"Starting MySQL CDC source for {self._table_name}") + + # Perform initial snapshot if needed + self.perform_initial_snapshot_if_needed() + + # Log binlog position if available + saved_position = self.get_binlog_position() + if saved_position: + logger.info(f"Resuming from binlog position: {saved_position}") + + # Start CDC loop + while self.running: + try: + # Get changes from MySQL binlog + changes = self._helper.get_changes(self.binlog_stream) + + # Add changes to buffer + for change in changes: + self.buffer.append(change) + + if len(self.buffer) > 0: + logger.debug(f"Buffer length: {len(self.buffer)}") + + # Process buffered messages if flush interval has passed + self.process_buffered_messages() + + # Small sleep to prevent excessive CPU usage + time.sleep(self.wait_interval) + + except Exception as e: + logger.error(f"Error in CDC loop: {e}") + # Still continue running unless it's a fatal error + time.sleep(1) # Wait a bit longer on error + + def stop(self): + """Clean up resources when stopping""" + logger.info("Stopping MySQL CDC source") + + # Process any remaining buffered messages + if len(self.buffer) > 0: + logger.info(f"Processing {len(self.buffer)} remaining buffered messages") + self.process_buffered_messages() + + # Clean up connections + if self.conn: + self.conn.close() + logger.info("MySQL connection closed") + + if self.binlog_stream: + self.binlog_stream.close() + logger.info("Binlog stream closed") + + super().stop() diff --git a/quixstreams/sources/community/mysql_cdc/mysql_helper.py b/quixstreams/sources/community/mysql_cdc/mysql_helper.py new file mode 100644 index 000000000..8705c320d --- /dev/null +++ b/quixstreams/sources/community/mysql_cdc/mysql_helper.py @@ -0,0 +1,325 @@ +import base64 +import json +import logging +import os +import time + +import pymysql +from pymysqlreplication import BinLogStreamReader +from pymysqlreplication.row_event import ( + DeleteRowsEvent, + UpdateRowsEvent, + WriteRowsEvent, +) + +__all__ = ("MySqlHelper",) + +logger = logging.getLogger(__name__) + + +class MySqlHelper: + def __init__( + self, + host: str, + port: int, + user: str, + password: str, + database: str, + table: str, + state_dir: str, + snapshot_host: str, + ): + self._host = host + self._port = port + self._user = user + self._password = password + self._database = database + self._table = table + self._snapshot_host = snapshot_host + self._state_dir = state_dir + + def connect_mysql(self, override_host=None): + return pymysql.connect( + host=override_host or self._host, + port=self._port, + user=self._user, + password=self._password, + database=self._database, + charset="utf8mb4", + ) + + def enable_binlog_if_needed(self): + """Check and enable binary logging if not already enabled""" + conn = self.connect_mysql() + try: + with conn.cursor() as cursor: + # Check if binary logging is enabled + cursor.execute("SHOW VARIABLES LIKE 'log_bin'") + result = cursor.fetchone() + + if result and result[1] == "ON": + logger.info("Binary logging is already enabled") + else: + logger.warning( + "Binary logging is not enabled. Please enable it in MySQL configuration." + ) + logger.warning("Add the following to your MySQL config:") + logger.warning("log-bin=mysql-bin") + logger.warning("binlog-format=ROW") + raise Exception("Binary logging must be enabled for CDC") + + # Check binlog format + cursor.execute("SHOW VARIABLES LIKE 'binlog_format'") + result = cursor.fetchone() + + if result and result[1] != "ROW": + logger.warning( + f"Binlog format is {result[1]}, should be ROW for CDC" + ) + logger.warning( + "Please set binlog_format=ROW in MySQL configuration" + ) + + finally: + conn.close() + + def setup_mysql_cdc(self): + """Setup MySQL for CDC - mainly validation""" + conn = self.connect_mysql() + try: + with conn.cursor() as cursor: + cursor.execute(f"SHOW TABLES LIKE '{self._table}'") + if not cursor.fetchone(): + raise Exception(f"Table {self._table} not found") + finally: + conn.close() + + def get_binlog_position_file(self): + os.makedirs(self._state_dir, exist_ok=True) + return os.path.join( + self._state_dir, f"binlog_position_{self._database}_{self._table}.json" + ) + + def save_binlog_position(self, log_file, log_pos): + position_file = self.get_binlog_position_file() + position_data = { + "log_file": log_file, + "log_pos": log_pos, + "timestamp": time.time(), + "readable_time": time.strftime("%Y-%m-%d %H:%M:%S UTC", time.gmtime()), + } + try: + with open(position_file, "w") as f: + json.dump(position_data, f, indent=2) + logger.debug(f"Saved binlog position: {log_file}:{log_pos}") + except Exception as e: + logger.error(f"Failed to save binlog position: {e}") + + def load_binlog_position(self): + position_file = self.get_binlog_position_file() + if os.path.exists(position_file): + try: + with open(position_file, "r") as f: + position_data = json.load(f) + logger.info( + f"Loaded binlog position: {position_data['log_file']}:{position_data['log_pos']} from {position_data.get('readable_time', 'unknown time')}" + ) + return position_data["log_file"], position_data["log_pos"] + except Exception as e: + logger.error(f"Failed to load binlog position: {e}") + return None, None + + def create_binlog_stream(self, server_id=1): + mysql_settings = { + "host": self._host, + "port": self._port, + "user": self._user, + "passwd": self._password, + } + + log_file, log_pos = self.load_binlog_position() + + stream_kwargs = { + "connection_settings": mysql_settings, + "server_id": server_id, + "only_events": [DeleteRowsEvent, WriteRowsEvent, UpdateRowsEvent], + "resume_stream": True, + "blocking": False, + } + + # If we have a saved position, use it + if log_file and log_pos: + stream_kwargs["log_file"] = log_file + stream_kwargs["log_pos"] = log_pos + logger.info( + f"Resuming binlog stream from saved position: {log_file}:{log_pos}" + ) + else: + logger.info( + "No saved binlog position found, starting from current position" + ) + + return BinLogStreamReader(**stream_kwargs) + + def get_changes(self, stream): + """Get changes from MySQL binlog stream and save position after processing""" + changes = [] + last_position = None + + # Read available events (non-blocking) + for binlogevent in stream: + # Update position tracking + if hasattr(stream, "log_file") and hasattr(stream, "log_pos"): + last_position = (stream.log_file, stream.log_pos) + + # Filter by schema and table + if ( + binlogevent.schema == self._database + and binlogevent.table == self._table + ): + if isinstance(binlogevent, WriteRowsEvent): + # INSERT operation + for row in binlogevent.rows: + change = { + "kind": "insert", + "schema": binlogevent.schema, + "table": binlogevent.table, + "columnnames": list(row["values"].keys()), + "columnvalues": [ + self.serialize_value(v) for v in row["values"].values() + ], + "oldkeys": {}, + } + changes.append(change) + + elif isinstance(binlogevent, UpdateRowsEvent): + # UPDATE operation + for row in binlogevent.rows: + change = { + "kind": "update", + "schema": binlogevent.schema, + "table": binlogevent.table, + "columnnames": list(row["after_values"].keys()), + "columnvalues": [ + self.serialize_value(v) + for v in row["after_values"].values() + ], + "oldkeys": { + "keynames": list(row["before_values"].keys()), + "keyvalues": [ + self.serialize_value(v) + for v in row["before_values"].values() + ], + }, + } + changes.append(change) + + elif isinstance(binlogevent, DeleteRowsEvent): + # DELETE operation + for row in binlogevent.rows: + change = { + "kind": "delete", + "schema": binlogevent.schema, + "table": binlogevent.table, + "columnnames": [], + "columnvalues": [], + "oldkeys": { + "keynames": list(row["values"].keys()), + "keyvalues": [ + self.serialize_value(v) + for v in row["values"].values() + ], + }, + } + changes.append(change) + + # Save position if we processed any events + if last_position and changes: + self.save_binlog_position(last_position[0], last_position[1]) + + return changes + + def perform_initial_snapshot(self, batch_size=1000): + conn = self.connect_mysql(override_host=self._snapshot_host) + changes = [] + + try: + with conn.cursor() as cursor: + # Get total row count for logging + cursor.execute( + f"SELECT COUNT(*) FROM `{self._database}`.`{self._table}`" # noqa: S608 + ) + total_rows = cursor.fetchone()[0] + logger.info( + f"Starting initial snapshot of {self._database}.{self._table} - {total_rows} rows" + ) + + # Use LIMIT with OFFSET for batching to avoid memory issues with large tables + offset = 0 + processed_rows = 0 + + while True: + # Fetch batch of rows + cursor.execute( + f"SELECT * FROM `{self._database}`.`{self._table}` LIMIT {batch_size} OFFSET {offset}" # noqa: S608 + ) + rows = cursor.fetchall() + + if not rows: + break + + # Get column names + column_names = [desc[0] for desc in cursor.description] + + # Convert each row to a change event + for row in rows: + # Convert row tuple to dictionary + row_dict = dict(zip(column_names, row)) + + # Convert values to JSON-serializable format + serialized_values = [ + self.serialize_value(value) for value in row_dict.values() + ] + + change = { + "kind": "snapshot_insert", # Different kind to distinguish from real inserts + "schema": self._database, + "table": self._table, + "columnnames": column_names, + "columnvalues": serialized_values, + "oldkeys": {}, + } + changes.append(change) + + processed_rows += len(rows) + offset += batch_size + + if processed_rows % 50000 == 0: # Log progress every 50k rows + logger.info( + f"Snapshot progress: {processed_rows}/{total_rows} rows processed" + ) + + logger.info( + f"Initial snapshot completed: {processed_rows} rows captured" + ) + + except Exception as e: + logger.error(f"Error during initial snapshot: {e}") + raise + finally: + conn.close() + + return changes + + @staticmethod + def serialize_value(value): + if value is None: + return None + elif isinstance(value, (bytes, bytearray)): + return base64.b64encode(value).decode("utf-8") + elif hasattr(value, "isoformat"): + return value.isoformat() + elif isinstance(value, (int, float, str, bool)): + return value + else: + return str(value) diff --git a/quixstreams/sources/community/mysql_cdc/setup_logger.py b/quixstreams/sources/community/mysql_cdc/setup_logger.py new file mode 100644 index 000000000..0280f4948 --- /dev/null +++ b/quixstreams/sources/community/mysql_cdc/setup_logger.py @@ -0,0 +1,13 @@ +import logging + +# Set up logger +PROD_ENV = True +logger = logging.getLogger("MySQL CDC") +logging.basicConfig() + +if PROD_ENV: + logger.setLevel(logging.INFO) + logger.info("Running in Production Mode...") +else: + logger.setLevel(logging.DEBUG) + logger.info("Running in Debug Mode...") From 8ae0faffa337ddac2f0cd2086b2811d2eea3cc2c Mon Sep 17 00:00:00 2001 From: Tim Sawicki Date: Mon, 30 Jun 2025 22:35:18 -0400 Subject: [PATCH 2/2] remove logger file --- .../sources/community/mysql_cdc/setup_logger.py | 13 ------------- 1 file changed, 13 deletions(-) delete mode 100644 quixstreams/sources/community/mysql_cdc/setup_logger.py diff --git a/quixstreams/sources/community/mysql_cdc/setup_logger.py b/quixstreams/sources/community/mysql_cdc/setup_logger.py deleted file mode 100644 index 0280f4948..000000000 --- a/quixstreams/sources/community/mysql_cdc/setup_logger.py +++ /dev/null @@ -1,13 +0,0 @@ -import logging - -# Set up logger -PROD_ENV = True -logger = logging.getLogger("MySQL CDC") -logging.basicConfig() - -if PROD_ENV: - logger.setLevel(logging.INFO) - logger.info("Running in Production Mode...") -else: - logger.setLevel(logging.DEBUG) - logger.info("Running in Debug Mode...")