Skip to content

Commit ff61732

Browse files
Add codec support for column addition in schema changes (#486)
* fix: include column codecs in add_columns macro --------- Co-authored-by: Anatoliy Romanov <[email protected]>
1 parent b31acd2 commit ff61732

File tree

4 files changed

+192
-1
lines changed

4 files changed

+192
-1
lines changed

CHANGELOG.md

Lines changed: 3 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -1,5 +1,8 @@
11
### Release [1.9.4], 2025-XX-XX
22

3+
#### Improvements
4+
* Add support for preserving column codec configurations during incremental schema changes (append_new_columns and sync_all_columns) ([#486](https://github.com/ClickHouse/dbt-clickhouse/pull/486)).
5+
36
#### Bugs
47
* Fix Materialized View not dropped when a model's materialization is changed from materialized_view to view ([#516](https://github.com/ClickHouse/dbt-clickhouse/pull/516)).
58
* Ensure that temporary tables are not accessed with database clause ([#515](https://github.com/ClickHouse/dbt-clickhouse/pull/515)).

dbt/include/clickhouse/macros/materializations/incremental/schema_changes.sql

Lines changed: 2 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -20,8 +20,9 @@
2020

2121
{% macro clickhouse__add_columns(columns, existing_relation, existing_local=none, is_distributed=False) %}
2222
{% for column in columns %}
23+
{% set codec = model['columns'].get(column.name, {}).get('codec') %}
2324
{% set alter_action -%}
24-
add column if not exists `{{ column.name }}` {{ column.data_type }}
25+
add column if not exists `{{ column.name }}` {{ column.data_type }} {{ codec_clause(codec) }}
2526
{%- endset %}
2627
{% do clickhouse__run_alter_table_command(alter_action, existing_relation, existing_local, is_distributed) %}
2728
{% endfor %}

dbt/include/clickhouse/macros/materializations/table.sql

Lines changed: 6 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -244,3 +244,9 @@
244244
{%- endif -%}
245245
{{ adapter.get_model_query_settings(model) }}
246246
{%- endmacro %}
247+
248+
{% macro codec_clause(codec_name) %}
249+
{%- if codec_name %}
250+
CODEC({{ codec_name }})
251+
{%- endif %}
252+
{% endmacro %}
Lines changed: 181 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,181 @@
1+
import os
2+
3+
import pytest
4+
from dbt.tests.util import run_dbt, run_dbt_and_capture
5+
6+
schema_change_with_codec_sql = """
7+
{{
8+
config(
9+
materialized='%s',
10+
unique_key='col_1',
11+
on_schema_change='%s'
12+
)
13+
}}
14+
15+
{%% if not is_incremental() %%}
16+
select
17+
number as col_1,
18+
number + 1 as col_2
19+
from numbers(3)
20+
{%% else %%}
21+
select
22+
number as col_1,
23+
number + 1 as col_2,
24+
number + 2 as col_3
25+
from numbers(2, 3)
26+
{%% endif %%}
27+
"""
28+
29+
30+
schema_change_with_codec_yml = """
31+
version: 2
32+
models:
33+
- name: schema_change_codec_append
34+
columns:
35+
- name: col_1
36+
data_type: UInt64
37+
- name: col_2
38+
data_type: UInt64
39+
- name: col_3
40+
data_type: UInt64
41+
codec: ZSTD
42+
- name: schema_change_codec_distributed_append
43+
columns:
44+
- name: col_1
45+
data_type: UInt64
46+
- name: col_2
47+
data_type: UInt64
48+
- name: col_3
49+
data_type: UInt64
50+
codec: LZ4
51+
"""
52+
53+
54+
class TestSchemaChangeWithCodec:
55+
@pytest.fixture(scope="class")
56+
def models(self):
57+
return {
58+
"schema_change_codec_append.sql": schema_change_with_codec_sql
59+
% ("incremental", "append_new_columns"),
60+
"schema_change_codec_distributed_append.sql": schema_change_with_codec_sql
61+
% ("distributed_incremental", "append_new_columns"),
62+
"schema.yml": schema_change_with_codec_yml,
63+
}
64+
65+
@pytest.mark.parametrize(
66+
"model", ("schema_change_codec_append", "schema_change_codec_distributed_append")
67+
)
68+
def test_append_with_codec(self, project, model):
69+
if (
70+
model == "schema_change_codec_distributed_append"
71+
and os.environ.get('DBT_CH_TEST_CLUSTER', '').strip() == ''
72+
):
73+
pytest.skip("Not on a cluster")
74+
75+
run_dbt(["run", "--select", model])
76+
result = project.run_sql(f"select * from {model} order by col_1", fetch="all")
77+
assert len(result) == 3
78+
assert result[0][1] == 1
79+
80+
run_dbt(["--debug", "run", "--select", model])
81+
result = project.run_sql(f"select * from {model} order by col_1", fetch="all")
82+
83+
assert all(len(row) == 3 for row in result)
84+
assert result[0][2] == 0
85+
assert result[3][2] == 5
86+
87+
table_name = f"{project.test_schema}.{model}"
88+
create_table_sql = project.run_sql(f"SHOW CREATE TABLE {table_name}", fetch="one")[0]
89+
90+
assert "CODEC" in create_table_sql
91+
if "distributed" in model:
92+
assert "LZ4" in create_table_sql
93+
else:
94+
assert "ZSTD" in create_table_sql
95+
96+
97+
sync_all_columns_with_codec_sql = """
98+
{{
99+
config(
100+
materialized='%s',
101+
unique_key='col_1',
102+
on_schema_change='sync_all_columns'
103+
)
104+
}}
105+
106+
{%% if not is_incremental() %%}
107+
select
108+
toUInt8(number) as col_1,
109+
number + 1 as col_2
110+
from numbers(3)
111+
{%% else %%}
112+
select
113+
toFloat32(number) as col_1,
114+
number + 2 as col_3
115+
from numbers(2, 3)
116+
{%% endif %%}
117+
"""
118+
119+
sync_all_columns_with_codec_yml = """
120+
version: 2
121+
models:
122+
- name: sync_codec_test
123+
columns:
124+
- name: col_1
125+
data_type: Float32
126+
- name: col_3
127+
data_type: UInt64
128+
codec: ZSTD
129+
- name: sync_codec_distributed_test
130+
columns:
131+
- name: col_1
132+
data_type: Float32
133+
- name: col_3
134+
data_type: UInt64
135+
codec: LZ4
136+
"""
137+
138+
139+
class TestSyncAllColumnsWithCodec:
140+
@pytest.fixture(scope="class")
141+
def models(self):
142+
return {
143+
"sync_codec_test.sql": sync_all_columns_with_codec_sql % "incremental",
144+
"sync_codec_distributed_test.sql": sync_all_columns_with_codec_sql
145+
% "distributed_incremental",
146+
"schema.yml": sync_all_columns_with_codec_yml,
147+
}
148+
149+
@pytest.mark.parametrize("model", ("sync_codec_test", "sync_codec_distributed_test"))
150+
def test_sync_all_columns_with_codec(self, project, model):
151+
if (
152+
model == "sync_codec_distributed_test"
153+
and os.environ.get('DBT_CH_TEST_CLUSTER', '').strip() == ''
154+
):
155+
pytest.skip("Not on a cluster")
156+
157+
run_dbt(["run", "--select", model])
158+
result = project.run_sql(f"select * from {model} order by col_1", fetch="all")
159+
assert len(result) == 3
160+
assert result[0][1] == 1
161+
162+
run_dbt(["run", "--select", model])
163+
result = project.run_sql(f"select * from {model} order by col_1", fetch="all")
164+
165+
assert all(len(row) == 2 for row in result)
166+
assert result[0][1] == 0
167+
assert result[3][1] == 5
168+
169+
table_name = f"{project.test_schema}.{model}"
170+
create_table_sql = project.run_sql(f"SHOW CREATE TABLE {table_name}", fetch="one")[0]
171+
172+
assert "CODEC" in create_table_sql
173+
if "distributed" in model:
174+
assert "LZ4" in create_table_sql
175+
else:
176+
assert "ZSTD" in create_table_sql
177+
178+
result_types = project.run_sql(
179+
f"select toColumnTypeName(col_1) from {model} limit 1", fetch="one"
180+
)
181+
assert "Float32" in result_types[0]

0 commit comments

Comments
 (0)