Skip to content
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
1 change: 1 addition & 0 deletions README.md
Original file line number Diff line number Diff line change
Expand Up @@ -195,6 +195,7 @@ mysql:
port: 8306
user: 'root'
password: 'root'
charset: 'utf8mb4' # optional, default is utf8mb4 for full Unicode support

clickhouse:
host: 'localhost'
Expand Down
1 change: 1 addition & 0 deletions example_config.yaml
Original file line number Diff line number Diff line change
Expand Up @@ -4,6 +4,7 @@ mysql:
port: 8306
user: 'root'
password: 'root'
charset: 'utf8mb4' # Optional: charset for MySQL connection (default: utf8mb4). Use utf8mb4 for full Unicode support including emoji and 4-byte characters

clickhouse:
host: 'localhost'
Expand Down
1 change: 1 addition & 0 deletions mysql_ch_replicator/binlog_replicator.py
Original file line number Diff line number Diff line change
Expand Up @@ -351,6 +351,7 @@ def __init__(self, settings: Settings):
'port': self.mysql_settings.port,
'user': self.mysql_settings.user,
'passwd': self.mysql_settings.password,
'charset': self.mysql_settings.charset,
}
self.data_writer = DataWriter(self.replicator_settings)
self.state = State(os.path.join(self.replicator_settings.data_dir, 'state.json'))
Expand Down
4 changes: 4 additions & 0 deletions mysql_ch_replicator/config.py
Original file line number Diff line number Diff line change
Expand Up @@ -15,6 +15,7 @@ class MysqlSettings:
port: int = 3306
user: str = 'root'
password: str = ''
charset: str = 'utf8mb4' # Default to utf8mb4 for full Unicode support

def validate(self):
if not isinstance(self.host, str):
Expand All @@ -28,6 +29,9 @@ def validate(self):

if not isinstance(self.password, str):
raise ValueError(f'mysql password should be string and not {stype(self.password)}')

if not isinstance(self.charset, str):
raise ValueError(f'mysql charset should be string and not {stype(self.charset)}')


@dataclass
Expand Down
6 changes: 6 additions & 0 deletions mysql_ch_replicator/mysql_api.py
Original file line number Diff line number Diff line change
Expand Up @@ -28,6 +28,12 @@ def reconnect_if_required(self, force=False):
user=self.mysql_settings.user,
passwd=self.mysql_settings.password,
)
# Use charset from config if available
if hasattr(self.mysql_settings, 'charset'):
conn_settings['charset'] = self.mysql_settings.charset
# Set appropriate collation based on charset
if self.mysql_settings.charset == 'utf8mb4':
conn_settings['collation'] = 'utf8mb4_unicode_ci'
try:
self.db = mysql.connector.connect(**conn_settings)
except mysql.connector.errors.DatabaseError as e:
Expand Down
121 changes: 121 additions & 0 deletions test_mysql_ch_replicator.py
Original file line number Diff line number Diff line change
Expand Up @@ -3084,6 +3084,126 @@ def test_resume_initial_replication_with_ignore_deletes():
os.unlink(config_file)


def test_charset_configuration():
"""
Test that charset configuration is properly loaded and used for MySQL connections.
This test verifies that utf8mb4 charset can be configured to properly handle
4-byte Unicode characters in JSON fields.
"""
# Create a temporary config file with explicit charset configuration
with tempfile.NamedTemporaryFile(mode='w', suffix='.yaml', delete=False) as temp_config_file:
config_file = temp_config_file.name

# Load base config and add charset setting
with open(CONFIG_FILE, 'r') as f:
base_config = yaml.safe_load(f)

# Ensure charset is set to utf8mb4
base_config['mysql']['charset'] = 'utf8mb4'

yaml.dump(base_config, temp_config_file)

try:
cfg = config.Settings()
cfg.load(config_file)

# Verify charset is loaded correctly
assert hasattr(cfg.mysql, 'charset'), "MysqlSettings should have charset attribute"
assert cfg.mysql.charset == 'utf8mb4', f"Expected charset utf8mb4, got {cfg.mysql.charset}"

mysql = mysql_api.MySQLApi(None, cfg.mysql)
ch = clickhouse_api.ClickhouseApi(None, cfg.clickhouse)

prepare_env(cfg, mysql, ch)

mysql.database = TEST_DB_NAME
ch.database = TEST_DB_NAME

# Create table with JSON field
mysql.execute(f"""
CREATE TABLE IF NOT EXISTS {TEST_TABLE_NAME} (
id INT AUTO_INCREMENT PRIMARY KEY,
json_data JSON
) ENGINE=InnoDB DEFAULT CHARSET=utf8mb4 COLLATE=utf8mb4_unicode_ci
""", commit=True)

# Insert data with 4-byte Unicode characters (emoji and Arabic text)
test_data = {
"ar": "مرحباً بالعالم", # Arabic: Hello World
"emoji": "🌍🎉✨",
"cn": "你好世界", # Chinese: Hello World
"en": "Hello World"
}

mysql.execute(
f"INSERT INTO {TEST_TABLE_NAME} (json_data) VALUES (%s)",
args=(json.dumps(test_data, ensure_ascii=False),),
commit=True
)

# Verify the data can be read back correctly
mysql.cursor.execute(f"SELECT json_data FROM {TEST_TABLE_NAME}")
result = mysql.cursor.fetchone()
assert result is not None, "Should have retrieved a record"

retrieved_data = json.loads(result[0]) if isinstance(result[0], str) else result[0]
assert retrieved_data['ar'] == test_data['ar'], f"Arabic text mismatch: {retrieved_data['ar']} != {test_data['ar']}"
assert retrieved_data['emoji'] == test_data['emoji'], f"Emoji mismatch: {retrieved_data['emoji']} != {test_data['emoji']}"
assert retrieved_data['cn'] == test_data['cn'], f"Chinese text mismatch: {retrieved_data['cn']} != {test_data['cn']}"

# Test binlog replication with charset
binlog_replicator_runner = BinlogReplicatorRunner(cfg_file=config_file)
binlog_replicator_runner.run()

try:
# Start db replicator
db_replicator_runner = DbReplicatorRunner(TEST_DB_NAME, cfg_file=config_file)
db_replicator_runner.run()

# Wait for database and table to be created in ClickHouse
assert_wait(lambda: TEST_DB_NAME in ch.get_databases(), max_wait_time=20)
ch.execute_command(f'USE `{TEST_DB_NAME}`')
assert_wait(lambda: TEST_TABLE_NAME in ch.get_tables(), max_wait_time=20)

# Wait for replication
assert_wait(lambda: len(ch.select(TEST_TABLE_NAME)) == 1, max_wait_time=20)

# Verify data in ClickHouse
ch_records = ch.select(TEST_TABLE_NAME)
assert len(ch_records) == 1, f"Expected 1 record in ClickHouse, got {len(ch_records)}"

# Access the json_data column using dictionary access
ch_record = ch_records[0]
ch_json_data = json.loads(ch_record['json_data']) if isinstance(ch_record['json_data'], str) else ch_record['json_data']

# Verify Unicode characters are preserved correctly
assert ch_json_data['ar'] == test_data['ar'], f"Arabic text not preserved in CH: {ch_json_data.get('ar')}"
assert ch_json_data['emoji'] == test_data['emoji'], f"Emoji not preserved in CH: {ch_json_data.get('emoji')}"
assert ch_json_data['cn'] == test_data['cn'], f"Chinese text not preserved in CH: {ch_json_data.get('cn')}"

# Test realtime replication with more Unicode data
more_data = {"test": "🔥 Real-time 测试 اختبار"}
mysql.execute(
f"INSERT INTO {TEST_TABLE_NAME} (json_data) VALUES (%s)",
args=(json.dumps(more_data, ensure_ascii=False),),
commit=True
)

assert_wait(lambda: len(ch.select(TEST_TABLE_NAME)) == 2, max_wait_time=20)

# Verify the second record
ch_records = ch.select(TEST_TABLE_NAME)
assert len(ch_records) == 2, f"Expected 2 records in ClickHouse, got {len(ch_records)}"

db_replicator_runner.stop()
finally:
binlog_replicator_runner.stop()

finally:
# Clean up temp config file
os.unlink(config_file)


@pytest.mark.parametrize("input_sql,expected_output", [
# Basic single quote comment
(
Expand Down Expand Up @@ -3222,3 +3342,4 @@ def normalize_whitespace(text):
return re.sub(r'[ \t]+', ' ', text).strip()

assert normalize_whitespace(result) == normalize_whitespace(expected_output), f"Failed for input: {input_sql}"