From bfdb1d237bc5f2d7c8e17cda68cc905688a7df16 Mon Sep 17 00:00:00 2001 From: thakur-ranjit20 <84329989+thakur-ranjit20@users.noreply.github.com> Date: Wed, 14 Aug 2024 13:28:49 +1000 Subject: [PATCH 1/7] Update get_escape_characters.sql --- macros/internal/metadata_processing/get_escape_characters.sql | 4 ++++ 1 file changed, 4 insertions(+) diff --git a/macros/internal/metadata_processing/get_escape_characters.sql b/macros/internal/metadata_processing/get_escape_characters.sql index aab23f42..12ffb08b 100644 --- a/macros/internal/metadata_processing/get_escape_characters.sql +++ b/macros/internal/metadata_processing/get_escape_characters.sql @@ -51,3 +51,7 @@ {%- macro postgres__get_escape_characters() %} {%- do return (('"', '"')) -%} {%- endmacro %} + +{%- macro duckdb__get_escape_characters() %} + {%- do return (('', '')) -%} +{%- endmacro %} From 2d95601a386187f211c5f759ec4cf113c23536ed Mon Sep 17 00:00:00 2001 From: thakur-ranjit20 <84329989+thakur-ranjit20@users.noreply.github.com> Date: Wed, 14 Aug 2024 13:29:29 +1000 Subject: [PATCH 2/7] Update cast_date.sql --- macros/supporting/casting/cast_date.sql | 8 +++++++- 1 file changed, 7 insertions(+), 1 deletion(-) diff --git a/macros/supporting/casting/cast_date.sql b/macros/supporting/casting/cast_date.sql index f3269a67..a49bcfef 100644 --- a/macros/supporting/casting/cast_date.sql +++ b/macros/supporting/casting/cast_date.sql @@ -68,4 +68,10 @@ {%- if alias %} AS {{ alias }} {%- endif %} -{%- endmacro -%} \ No newline at end of file +{%- endmacro -%} + +{%- macro duckdb__cast_date(column_str, as_string=false, datetime=false, alias=none) -%} + + {{ dbtvault.snowflake__cast_date(column_str=column_str, as_string=as_string, datetime=datetime, alias=alias)}} + +{%- endmacro -%} From 6549f8f6f783673d68e96ef3fc90bdf047b867f2 Mon Sep 17 00:00:00 2001 From: thakur-ranjit20 <84329989+thakur-ranjit20@users.noreply.github.com> Date: Wed, 14 Aug 2024 13:29:53 +1000 Subject: [PATCH 3/7] Update cast_datetime.sql --- macros/supporting/casting/cast_datetime.sql | 10 +++++++++- 1 file changed, 9 insertions(+), 1 deletion(-) diff --git a/macros/supporting/casting/cast_datetime.sql b/macros/supporting/casting/cast_datetime.sql index c086545f..94eb77da 100644 --- a/macros/supporting/casting/cast_datetime.sql +++ b/macros/supporting/casting/cast_datetime.sql @@ -66,4 +66,12 @@ {%- if alias %} AS {{ alias }} {%- endif %} -{%- endmacro -%} \ No newline at end of file +{%- endmacro -%} + +{%- macro duckdb__cast_datetime(column_str, as_string=false, alias=none, date_type=none) -%} + + to_char(timestamp {{ column_str }}, 'YYYY-MM-DD HH24:MI:SS.MS')::timestamp + + {%- if alias %} AS {{ alias }} {%- endif %} + +{%- endmacro -%} From 4db56d6c8eb9586015a015426bd44219f4ba6f13 Mon Sep 17 00:00:00 2001 From: Palak Mahajan <130cpalak@gmail.com> Date: Wed, 21 Aug 2024 16:51:21 +0530 Subject: [PATCH 4/7] fixes for duckdb to support automate dv --- .../escape_column_names.sql | 6 +- .../period_mat_helpers/get_period_of_load.sql | 16 +++ macros/staging/null_columns.sql | 9 ++ macros/supporting/hash.sql | 88 ++++++++++++ macros/tables/duckdb/hub.sql | 89 ++++++++++++ macros/tables/duckdb/link.sql | 100 +++++++++++++ macros/tables/duckdb/sat.sql | 135 ++++++++++++++++++ 7 files changed, 442 insertions(+), 1 deletion(-) create mode 100644 macros/tables/duckdb/hub.sql create mode 100644 macros/tables/duckdb/link.sql create mode 100644 macros/tables/duckdb/sat.sql diff --git a/macros/internal/metadata_processing/escape_column_names.sql b/macros/internal/metadata_processing/escape_column_names.sql index 33069451..03be2207 100644 --- a/macros/internal/metadata_processing/escape_column_names.sql +++ b/macros/internal/metadata_processing/escape_column_names.sql @@ -97,4 +97,8 @@ {%- endif -%} -{%- endmacro -%} \ No newline at end of file +{%- endmacro -%} + +{%- macro duckdb__get_escape_characters() %} + {%- do return (('', '')) -%} +{%- endmacro %} \ No newline at end of file diff --git a/macros/materialisations/period_mat_helpers/get_period_of_load.sql b/macros/materialisations/period_mat_helpers/get_period_of_load.sql index 0ae62b8f..0ee2aea9 100644 --- a/macros/materialisations/period_mat_helpers/get_period_of_load.sql +++ b/macros/materialisations/period_mat_helpers/get_period_of_load.sql @@ -81,3 +81,19 @@ {% do return(period_of_load) %} {%- endmacro -%} + + +{%- macro duckdb__get_period_of_load(period, offset, start_timestamp) -%} + {# Postgres uses different DateTime arithmetic #} + {% set period_of_load_sql -%} + SELECT DATE_TRUNC('{{ period }}', + TO_TIMESTAMP('{{ start_timestamp }}', 'YYYY-MM-DD HH24:MI:SS') + interval '{{ offset }} {{ period }}' + ) AS period_of_load + {%- endset %} + + {% set period_of_load_dict = dbtvault.get_query_results_as_dict(period_of_load_sql) %} + + {% set period_of_load = period_of_load_dict['PERIOD_OF_LOAD'][0] | string %} + + {% do return(period_of_load) %} +{%- endmacro -%} \ No newline at end of file diff --git a/macros/staging/null_columns.sql b/macros/staging/null_columns.sql index e3bb2b44..616c6703 100644 --- a/macros/staging/null_columns.sql +++ b/macros/staging/null_columns.sql @@ -85,4 +85,13 @@ {{ col_name }} AS {{ col_name ~ "_ORIGINAL" }}, COALESCE({{ col_name }}, '{{ default_value }}') AS {{ col_name }} +{%- endmacro -%} + +{%- macro duckdb__null_column_sql(col_name, default_value) -%} + + {%- set col_name_esc = dbtvault.escape_column_names(col_name) -%} + {%- set col_name_orig_esc = dbtvault.escape_column_names(col_name ~ "_ORIGINAL") -%} + {{ col_name_esc }} AS {{ col_name_orig_esc }}, + COALESCE({{ col_name_esc }}, '{{ default_value }}') AS {{ col_name_esc }} + {%- endmacro -%} \ No newline at end of file diff --git a/macros/supporting/hash.sql b/macros/supporting/hash.sql index 07610ff9..40eba8bd 100644 --- a/macros/supporting/hash.sql +++ b/macros/supporting/hash.sql @@ -118,4 +118,92 @@ {{ automate_dv.default__hash(columns=columns, alias=alias, is_hashdiff=is_hashdiff, columns_to_escape=columns_to_escape) }} +{%- endmacro -%} + +{%- macro duckdb__hash(columns, alias, is_hashdiff, columns_to_escape=none) -%} + +{%- set hash = var('hash', 'MD5') -%} +{%- set concat_string = var('concat_string', '||') -%} +{%- set null_placeholder_string = var('null_placeholder_string', '^^') -%} + +{#- Select hashing algorithm -#} +{%- if hash == 'MD5' -%} + {%- set hash_alg = 'MD5' -%} +{%- elif hash == 'SHA' -%} + {%- set hash_alg = 'SHA256' -%} +{%- else -%} + {%- set hash_alg = 'MD5' -%} +{%- endif -%} + +{#- Select hashing expression (left and right sides) -#} +{#- * MD5 is simple function call to md5(val) -#} +{#- * SHA256 needs input cast to BYTEA and then its BYTEA result encoded as hex text output -#} +{#- e.g. ENCODE(SHA256(CAST(val AS BYTEA)), 'hex') -#} +{#- Ref: https://www.postgresql.org/docs/11/functions-binarystring.html -#} +{%- if hash_alg == 'MD5' -%} + {%- set hash_expr_left = 'MD5(' -%} + {%- set hash_expr_right = ')' -%} +{%- elif hash_alg == 'SHA256' -%} + {%- set hash_expr_left = 'ENCODE(SHA256(CAST(' -%} + {%- set hash_expr_right = " AS BYTEA)), 'hex')" -%} +{%- endif -%} + +{%- set standardise = "NULLIF(UPPER(TRIM(CAST([EXPRESSION] AS VARCHAR))), '')" -%} + +{#- Alpha sort columns before hashing if a hashdiff -#} +{%- if is_hashdiff and automate_dv.is_list(columns) -%} + {%- set columns = columns|sort -%} +{%- endif -%} + +{#- If single column to hash -#} +{%- if columns is string -%} + {%- set column_str = automate_dv.as_constant(columns) -%} + {%- if automate_dv.is_expression(column_str) -%} + {%- set escaped_column_str = column_str -%} + {%- else -%} + {%- set escaped_column_str = automate_dv.escape_column_names(column_str) -%} + {%- endif -%} + + {{- "CAST(UPPER({}{}{}) AS BYTEA) AS {}".format(hash_expr_left, standardise | replace('[EXPRESSION]', escaped_column_str), hash_expr_right, automate_dv.escape_column_names(alias)) | indent(4) -}} + +{#- Else a list of columns to hash -#} +{%- else -%} + {%- set all_null = [] -%} + + {%- if is_hashdiff -%} + {{- "CAST(UPPER({}CONCAT_WS('{}',".format(hash_expr_left, concat_string) | indent(4) -}} + {%- else -%} + {{- "CAST(UPPER({}NULLIF(CONCAT_WS('{}',".format(hash_expr_left, concat_string) | indent(4) -}} + {%- endif -%} + + {%- for column in columns -%} + + {%- do all_null.append(null_placeholder_string) -%} + + {%- set column_str = automate_dv.as_constant(column) -%} + {%- if automate_dv.is_expression(column_str) -%} + {%- set escaped_column_str = column_str -%} + {%- else -%} + {%- set escaped_column_str = automate_dv.escape_column_names(column_str) -%} + {%- endif -%} + + {{- "\nCOALESCE({}, '{}')".format(standardise | replace('[EXPRESSION]', escaped_column_str), null_placeholder_string) | indent(4) -}} + {{- "," if not loop.last -}} + + {%- if loop.last -%} + + {% if is_hashdiff %} + {{- "\n){}) AS BYTEA) AS {}".format(hash_expr_right, automate_dv.escape_column_names(alias)) -}} + {%- else -%} + {{- "\n), '{}'){}) AS BYTEA) AS {}".format(all_null | join(""), hash_expr_right, automate_dv.escape_column_names(alias)) -}} + {%- endif -%} + {%- else -%} + + {%- do all_null.append(concat_string) -%} + + {%- endif -%} + {%- endfor -%} + +{%- endif -%} + {%- endmacro -%} \ No newline at end of file diff --git a/macros/tables/duckdb/hub.sql b/macros/tables/duckdb/hub.sql new file mode 100644 index 00000000..3db8a157 --- /dev/null +++ b/macros/tables/duckdb/hub.sql @@ -0,0 +1,89 @@ +{%- macro duckdb__hub(src_pk, src_nk, src_extra_columns, src_ldts, src_source, source_model) -%} + +{%- set source_cols = dbtvault.expand_column_list(columns=[src_pk, src_nk, src_extra_columns, src_ldts, src_source]) -%} + +{%- if model.config.materialized == 'vault_insert_by_rank' %} + {%- set source_cols_with_rank = source_cols + dbtvault.escape_column_names([config.get('rank_column')]) -%} +{%- endif -%} + +{{ dbtvault.prepend_generated_by() }} + +{{ 'WITH ' -}} + +{%- if not (source_model is iterable and source_model is not string) -%} + {%- set source_model = [source_model] -%} +{%- endif -%} + +{%- set ns = namespace(last_cte= "") -%} + +{%- for src in source_model -%} + +{%- set source_number = loop.index | string -%} + +row_rank_{{ source_number }} AS ( +{#- PostgreSQL has DISTINCT ON which should be more performant than the + strategy used by Snowflake ROW_NUMBER() OVER( PARTITION BY ... +-#} + {%- if model.config.materialized == 'vault_insert_by_rank' %} + SELECT DISTINCT ON ({{ dbtvault.prefix([src_pk], 'rr') }}) {{ dbtvault.prefix(source_cols_with_rank, 'rr') }} + {%- else %} + SELECT DISTINCT ON ({{ dbtvault.prefix([src_pk], 'rr') }}) {{ dbtvault.prefix(source_cols, 'rr') }} + {%- endif %} + FROM {{ ref(src) }} AS rr + WHERE {{ dbtvault.multikey(src_pk, prefix='rr', condition='IS NOT NULL') }} + ORDER BY {{ dbtvault.prefix([src_pk], 'rr') }}, {{ dbtvault.prefix([src_ldts], 'rr') }} + {%- set ns.last_cte = "row_rank_{}".format(source_number) %} +),{{ "\n" if not loop.last }} +{% endfor -%} +{% if source_model | length > 1 %} +stage_union AS ( + {%- for src in source_model %} + SELECT * FROM row_rank_{{ loop.index | string }} + {%- if not loop.last %} + UNION ALL + {%- endif %} + {%- endfor %} + {%- set ns.last_cte = "stage_union" %} +), +{%- endif -%} +{%- if model.config.materialized == 'vault_insert_by_period' %} +stage_mat_filter AS ( + SELECT * + FROM {{ ns.last_cte }} + WHERE __PERIOD_FILTER__ + {%- set ns.last_cte = "stage_mat_filter" %} +), +{%- elif model.config.materialized == 'vault_insert_by_rank' %} +stage_mat_filter AS ( + SELECT * + FROM {{ ns.last_cte }} + WHERE __RANK_FILTER__ + {%- set ns.last_cte = "stage_mat_filter" %} +), +{%- endif -%} +{%- if source_model | length > 1 %} + +row_rank_union AS ( +{#- PostgreSQL has DISTINCT ON which should be more performant than the + strategy used by Snowflake ROW_NUMBER() OVER( PARTITION BY ... +-#} + SELECT DISTINCT ON ({{ dbtvault.prefix([src_pk], 'ru') }}) ru.* + FROM {{ ns.last_cte }} AS ru + WHERE {{ dbtvault.multikey(src_pk, prefix='ru', condition='IS NOT NULL') }} + ORDER BY {{ dbtvault.prefix([src_pk], 'ru') }}, {{ dbtvault.prefix([src_ldts], 'ru') }}, {{ dbtvault.prefix([src_source], 'ru') }} ASC + {%- set ns.last_cte = "row_rank_union" %} +), +{% endif %} +records_to_insert AS ( + SELECT {{ dbtvault.prefix(source_cols, 'a', alias_target='target') }} + FROM {{ ns.last_cte }} AS a + {%- if dbtvault.is_any_incremental() %} + LEFT JOIN {{ this }} AS d + ON {{ dbtvault.multikey(src_pk, prefix=['a','d'], condition='=') }} + WHERE {{ dbtvault.multikey(src_pk, prefix='d', condition='IS NULL') }} + {%- endif %} +) + +SELECT * FROM records_to_insert + +{%- endmacro -%} \ No newline at end of file diff --git a/macros/tables/duckdb/link.sql b/macros/tables/duckdb/link.sql new file mode 100644 index 00000000..f50e9fa2 --- /dev/null +++ b/macros/tables/duckdb/link.sql @@ -0,0 +1,100 @@ +{%- macro duckdb__link(src_pk, src_fk, src_extra_columns, src_ldts, src_source, source_model) -%} + +{%- set source_cols = dbtvault.expand_column_list(columns=[src_pk, src_fk, src_extra_columns, src_ldts, src_source]) -%} +{%- set fk_cols = dbtvault.expand_column_list([src_fk]) -%} + +{%- if model.config.materialized == 'vault_insert_by_rank' %} + {%- set source_cols_with_rank = source_cols + dbtvault.escape_column_names([config.get('rank_column')]) -%} +{%- endif -%} + +{{ dbtvault.prepend_generated_by() }} + +{{ 'WITH ' -}} + +{%- if not (source_model is iterable and source_model is not string) -%} + {%- set source_model = [source_model] -%} +{%- endif -%} + +{%- set ns = namespace(last_cte= "") -%} + +{%- for src in source_model -%} + +{%- set source_number = loop.index | string -%} + +row_rank_{{ source_number }} AS ( + SELECT * FROM ( + {%- if model.config.materialized == 'vault_insert_by_rank' %} + SELECT {{ dbtvault.prefix(source_cols_with_rank, 'rr') }}, + {%- else %} + SELECT {{ dbtvault.prefix(source_cols, 'rr') }}, + {%- endif %} + ROW_NUMBER() OVER( + PARTITION BY {{ dbtvault.prefix([src_pk], 'rr') }} + ORDER BY {{ dbtvault.prefix([src_ldts], 'rr') }} + ) AS row_number + FROM {{ ref(src) }} AS rr + {%- if source_model | length == 1 %} + WHERE {{ dbtvault.multikey(src_pk, prefix='rr', condition='IS NOT NULL') }} + AND {{ dbtvault.multikey(fk_cols, prefix='rr', condition='IS NOT NULL') }} + {%- endif %} + ) as l + WHERE row_number = 1 + {%- set ns.last_cte = "row_rank_{}".format(source_number) %} +),{{ "\n" if not loop.last }} +{% endfor -%} +{% if source_model | length > 1 %} +stage_union AS ( + {%- for src in source_model %} + SELECT * FROM row_rank_{{ loop.index | string }} + {%- if not loop.last %} + UNION ALL + {%- endif %} + {%- endfor %} + {%- set ns.last_cte = "stage_union" %} +), +{%- endif -%} +{%- if model.config.materialized == 'vault_insert_by_period' %} +stage_mat_filter AS ( + SELECT * + FROM {{ ns.last_cte }} + WHERE __PERIOD_FILTER__ + {%- set ns.last_cte = "stage_mat_filter" %} +), +{%- elif model.config.materialized == 'vault_insert_by_rank' %} +stage_mat_filter AS ( + SELECT * + FROM {{ ns.last_cte }} + WHERE __RANK_FILTER__ + {%- set ns.last_cte = "stage_mat_filter" %} +), +{% endif %} +{%- if source_model | length > 1 %} + +row_rank_union AS ( + SELECT * FROM ( + SELECT ru.*, + ROW_NUMBER() OVER( + PARTITION BY {{ dbtvault.prefix([src_pk], 'ru') }} + ORDER BY {{ dbtvault.prefix([src_ldts], 'ru') }}, {{ dbtvault.prefix([src_source], 'ru') }} ASC + ) AS row_rank_number + FROM {{ ns.last_cte }} AS ru + WHERE {{ dbtvault.multikey(src_pk, prefix='ru', condition='IS NOT NULL') }} + AND {{ dbtvault.multikey(fk_cols, prefix='ru', condition='IS NOT NULL') }} + ) AS a + WHERE row_rank_number = 1 + {%- set ns.last_cte = "row_rank_union" %} +), +{% endif %} +records_to_insert AS ( + SELECT {{ dbtvault.prefix(source_cols, 'a', alias_target='target') }} + FROM {{ ns.last_cte }} AS a + {%- if dbtvault.is_any_incremental() %} + LEFT JOIN {{ this }} AS d + ON {{ dbtvault.multikey(src_pk, prefix=['a','d'], condition='=') }} + WHERE {{ dbtvault.multikey(src_pk, prefix='d', condition='IS NULL') }} + {%- endif %} +) + +SELECT * FROM records_to_insert + +{%- endmacro -%} \ No newline at end of file diff --git a/macros/tables/duckdb/sat.sql b/macros/tables/duckdb/sat.sql new file mode 100644 index 00000000..461de73d --- /dev/null +++ b/macros/tables/duckdb/sat.sql @@ -0,0 +1,135 @@ +{%- macro duckdb__sat(src_pk, src_hashdiff, src_payload, src_extra_columns, src_eff, src_ldts, src_source, source_model) -%} + + + + +{%- set source_cols = dbtvault.expand_column_list(columns=[src_pk, src_hashdiff, src_payload, src_extra_columns, src_eff, src_ldts, src_source]) -%} + +{%- set rank_cols = dbtvault.expand_column_list(columns=[src_pk, src_hashdiff, src_ldts]) -%} + +{%- set pk_cols = dbtvault.expand_column_list(columns=[src_pk]) -%} + + + + +{%- if model.config.materialized == 'vault_insert_by_rank' %} + + {%- set source_cols_with_rank = source_cols + dbtvault.escape_column_names([config.get('rank_column')]) -%} + +{%- endif -%} + + + + +{{ dbtvault.prepend_generated_by() }} + + + + +WITH source_data AS ( + + {%- if model.config.materialized == 'vault_insert_by_rank' %} + + SELECT {{ dbtvault.prefix(source_cols_with_rank, 'a', alias_target='source') }} + + {%- else %} + + SELECT {{ dbtvault.prefix(source_cols, 'a', alias_target='source') }} + + {%- endif %} + + FROM {{ ref(source_model) }} AS a + + WHERE {{ dbtvault.multikey(src_pk, prefix='a', condition='IS NOT NULL') }} + + {%- if model.config.materialized == 'vault_insert_by_period' %} + + AND __PERIOD_FILTER__ + + {% elif model.config.materialized == 'vault_insert_by_rank' %} + + AND __RANK_FILTER__ + + {% endif %} + +), + + + + +{%- if dbtvault.is_any_incremental() %} + + + + +latest_records AS ( + + SELECT {{ dbtvault.prefix(rank_cols, 'a', alias_target='target') }} + + FROM ( + + SELECT {{ dbtvault.prefix(rank_cols, 'current_records', alias_target='target') }}, + + RANK() OVER ( + + PARTITION BY {{ dbtvault.prefix([src_pk], 'current_records') }} + + ORDER BY {{ dbtvault.prefix([src_ldts], 'current_records') }} DESC + + ) AS rank + + FROM {{ this }} AS current_records + + JOIN ( + + SELECT DISTINCT {{ dbtvault.prefix([src_pk], 'source_data') }} + + FROM source_data + + ) AS source_records + + ON {{ dbtvault.multikey(src_pk, prefix=['current_records','source_records'], condition='=') }} + + ) AS a + + WHERE a.rank = 1 + +), + + + + +{%- endif %} + + + + +records_to_insert AS ( + + SELECT DISTINCT {{ dbtvault.alias_all(source_cols, 'stage') }} + + FROM source_data AS stage + + {%- if dbtvault.is_any_incremental() %} + + LEFT JOIN latest_records + + ON {{ dbtvault.multikey(src_pk, prefix=['latest_records','stage'], condition='=') }} + + WHERE {{ dbtvault.prefix([src_hashdiff], 'latest_records', alias_target='target') }} != {{ dbtvault.prefix([src_hashdiff], 'stage') }} + + OR {{ dbtvault.prefix([src_hashdiff], 'latest_records', alias_target='target') }} IS NULL + + {%- endif %} + +) + + + + +SELECT * FROM records_to_insert + + + + +{%- endmacro -%} \ No newline at end of file From 64625c40a52f2856a6e32bd86b2bc444dbb518d5 Mon Sep 17 00:00:00 2001 From: Palak Mahajan <130cpalak@gmail.com> Date: Wed, 21 Aug 2024 17:43:48 +0530 Subject: [PATCH 5/7] removed duplicate macro def --- macros/internal/metadata_processing/get_escape_characters.sql | 3 --- 1 file changed, 3 deletions(-) diff --git a/macros/internal/metadata_processing/get_escape_characters.sql b/macros/internal/metadata_processing/get_escape_characters.sql index 12ffb08b..490a8677 100644 --- a/macros/internal/metadata_processing/get_escape_characters.sql +++ b/macros/internal/metadata_processing/get_escape_characters.sql @@ -52,6 +52,3 @@ {%- do return (('"', '"')) -%} {%- endmacro %} -{%- macro duckdb__get_escape_characters() %} - {%- do return (('', '')) -%} -{%- endmacro %} From a0474f1e30110dbef324816069e476957444d7d2 Mon Sep 17 00:00:00 2001 From: Palak Mahajan <130cpalak@gmail.com> Date: Sun, 25 Aug 2024 17:36:33 +0530 Subject: [PATCH 6/7] dbt vault replaced by automate-dv for hubs, links and sats --- macros/tables/duckdb/hub.sql | 28 ++++++++++++------------ macros/tables/duckdb/link.sql | 36 +++++++++++++++---------------- macros/tables/duckdb/sat.sql | 40 +++++++++++++++++------------------ 3 files changed, 52 insertions(+), 52 deletions(-) diff --git a/macros/tables/duckdb/hub.sql b/macros/tables/duckdb/hub.sql index 3db8a157..d7c3b4ce 100644 --- a/macros/tables/duckdb/hub.sql +++ b/macros/tables/duckdb/hub.sql @@ -1,12 +1,12 @@ {%- macro duckdb__hub(src_pk, src_nk, src_extra_columns, src_ldts, src_source, source_model) -%} -{%- set source_cols = dbtvault.expand_column_list(columns=[src_pk, src_nk, src_extra_columns, src_ldts, src_source]) -%} +{%- set source_cols = automate_dv.expand_column_list(columns=[src_pk, src_nk, src_extra_columns, src_ldts, src_source]) -%} {%- if model.config.materialized == 'vault_insert_by_rank' %} - {%- set source_cols_with_rank = source_cols + dbtvault.escape_column_names([config.get('rank_column')]) -%} + {%- set source_cols_with_rank = source_cols + automate_dv.escape_column_names([config.get('rank_column')]) -%} {%- endif -%} -{{ dbtvault.prepend_generated_by() }} +{{ automate_dv.prepend_generated_by() }} {{ 'WITH ' -}} @@ -25,13 +25,13 @@ row_rank_{{ source_number }} AS ( strategy used by Snowflake ROW_NUMBER() OVER( PARTITION BY ... -#} {%- if model.config.materialized == 'vault_insert_by_rank' %} - SELECT DISTINCT ON ({{ dbtvault.prefix([src_pk], 'rr') }}) {{ dbtvault.prefix(source_cols_with_rank, 'rr') }} + SELECT DISTINCT ON ({{ automate_dv.prefix([src_pk], 'rr') }}) {{ automate_dv.prefix(source_cols_with_rank, 'rr') }} {%- else %} - SELECT DISTINCT ON ({{ dbtvault.prefix([src_pk], 'rr') }}) {{ dbtvault.prefix(source_cols, 'rr') }} + SELECT DISTINCT ON ({{ automate_dv.prefix([src_pk], 'rr') }}) {{ automate_dv.prefix(source_cols, 'rr') }} {%- endif %} FROM {{ ref(src) }} AS rr - WHERE {{ dbtvault.multikey(src_pk, prefix='rr', condition='IS NOT NULL') }} - ORDER BY {{ dbtvault.prefix([src_pk], 'rr') }}, {{ dbtvault.prefix([src_ldts], 'rr') }} + WHERE {{ automate_dv.multikey(src_pk, prefix='rr', condition='IS NOT NULL') }} + ORDER BY {{ automate_dv.prefix([src_pk], 'rr') }}, {{ automate_dv.prefix([src_ldts], 'rr') }} {%- set ns.last_cte = "row_rank_{}".format(source_number) %} ),{{ "\n" if not loop.last }} {% endfor -%} @@ -67,20 +67,20 @@ row_rank_union AS ( {#- PostgreSQL has DISTINCT ON which should be more performant than the strategy used by Snowflake ROW_NUMBER() OVER( PARTITION BY ... -#} - SELECT DISTINCT ON ({{ dbtvault.prefix([src_pk], 'ru') }}) ru.* + SELECT DISTINCT ON ({{ automate_dv.prefix([src_pk], 'ru') }}) ru.* FROM {{ ns.last_cte }} AS ru - WHERE {{ dbtvault.multikey(src_pk, prefix='ru', condition='IS NOT NULL') }} - ORDER BY {{ dbtvault.prefix([src_pk], 'ru') }}, {{ dbtvault.prefix([src_ldts], 'ru') }}, {{ dbtvault.prefix([src_source], 'ru') }} ASC + WHERE {{ automate_dv.multikey(src_pk, prefix='ru', condition='IS NOT NULL') }} + ORDER BY {{ automate_dv.prefix([src_pk], 'ru') }}, {{ automate_dv.prefix([src_ldts], 'ru') }}, {{ automate_dv.prefix([src_source], 'ru') }} ASC {%- set ns.last_cte = "row_rank_union" %} ), {% endif %} records_to_insert AS ( - SELECT {{ dbtvault.prefix(source_cols, 'a', alias_target='target') }} + SELECT {{ automate_dv.prefix(source_cols, 'a', alias_target='target') }} FROM {{ ns.last_cte }} AS a - {%- if dbtvault.is_any_incremental() %} + {%- if automate_dv.is_any_incremental() %} LEFT JOIN {{ this }} AS d - ON {{ dbtvault.multikey(src_pk, prefix=['a','d'], condition='=') }} - WHERE {{ dbtvault.multikey(src_pk, prefix='d', condition='IS NULL') }} + ON {{ automate_dv.multikey(src_pk, prefix=['a','d'], condition='=') }} + WHERE {{ automate_dv.multikey(src_pk, prefix='d', condition='IS NULL') }} {%- endif %} ) diff --git a/macros/tables/duckdb/link.sql b/macros/tables/duckdb/link.sql index f50e9fa2..967e44f9 100644 --- a/macros/tables/duckdb/link.sql +++ b/macros/tables/duckdb/link.sql @@ -1,13 +1,13 @@ {%- macro duckdb__link(src_pk, src_fk, src_extra_columns, src_ldts, src_source, source_model) -%} -{%- set source_cols = dbtvault.expand_column_list(columns=[src_pk, src_fk, src_extra_columns, src_ldts, src_source]) -%} -{%- set fk_cols = dbtvault.expand_column_list([src_fk]) -%} +{%- set source_cols = automate_dv.expand_column_list(columns=[src_pk, src_fk, src_extra_columns, src_ldts, src_source]) -%} +{%- set fk_cols = automate_dv.expand_column_list([src_fk]) -%} {%- if model.config.materialized == 'vault_insert_by_rank' %} - {%- set source_cols_with_rank = source_cols + dbtvault.escape_column_names([config.get('rank_column')]) -%} + {%- set source_cols_with_rank = source_cols + automate_dv.escape_column_names([config.get('rank_column')]) -%} {%- endif -%} -{{ dbtvault.prepend_generated_by() }} +{{ automate_dv.prepend_generated_by() }} {{ 'WITH ' -}} @@ -24,18 +24,18 @@ row_rank_{{ source_number }} AS ( SELECT * FROM ( {%- if model.config.materialized == 'vault_insert_by_rank' %} - SELECT {{ dbtvault.prefix(source_cols_with_rank, 'rr') }}, + SELECT {{ automate_dv.prefix(source_cols_with_rank, 'rr') }}, {%- else %} - SELECT {{ dbtvault.prefix(source_cols, 'rr') }}, + SELECT {{ automate_dv.prefix(source_cols, 'rr') }}, {%- endif %} ROW_NUMBER() OVER( - PARTITION BY {{ dbtvault.prefix([src_pk], 'rr') }} - ORDER BY {{ dbtvault.prefix([src_ldts], 'rr') }} + PARTITION BY {{ automate_dv.prefix([src_pk], 'rr') }} + ORDER BY {{ automate_dv.prefix([src_ldts], 'rr') }} ) AS row_number FROM {{ ref(src) }} AS rr {%- if source_model | length == 1 %} - WHERE {{ dbtvault.multikey(src_pk, prefix='rr', condition='IS NOT NULL') }} - AND {{ dbtvault.multikey(fk_cols, prefix='rr', condition='IS NOT NULL') }} + WHERE {{ automate_dv.multikey(src_pk, prefix='rr', condition='IS NOT NULL') }} + AND {{ automate_dv.multikey(fk_cols, prefix='rr', condition='IS NOT NULL') }} {%- endif %} ) as l WHERE row_number = 1 @@ -74,24 +74,24 @@ row_rank_union AS ( SELECT * FROM ( SELECT ru.*, ROW_NUMBER() OVER( - PARTITION BY {{ dbtvault.prefix([src_pk], 'ru') }} - ORDER BY {{ dbtvault.prefix([src_ldts], 'ru') }}, {{ dbtvault.prefix([src_source], 'ru') }} ASC + PARTITION BY {{ automate_dv.prefix([src_pk], 'ru') }} + ORDER BY {{ automate_dv.prefix([src_ldts], 'ru') }}, {{ automate_dv.prefix([src_source], 'ru') }} ASC ) AS row_rank_number FROM {{ ns.last_cte }} AS ru - WHERE {{ dbtvault.multikey(src_pk, prefix='ru', condition='IS NOT NULL') }} - AND {{ dbtvault.multikey(fk_cols, prefix='ru', condition='IS NOT NULL') }} + WHERE {{ automate_dv.multikey(src_pk, prefix='ru', condition='IS NOT NULL') }} + AND {{ automate_dv.multikey(fk_cols, prefix='ru', condition='IS NOT NULL') }} ) AS a WHERE row_rank_number = 1 {%- set ns.last_cte = "row_rank_union" %} ), {% endif %} records_to_insert AS ( - SELECT {{ dbtvault.prefix(source_cols, 'a', alias_target='target') }} + SELECT {{ automate_dv.prefix(source_cols, 'a', alias_target='target') }} FROM {{ ns.last_cte }} AS a - {%- if dbtvault.is_any_incremental() %} + {%- if automate_dv.is_any_incremental() %} LEFT JOIN {{ this }} AS d - ON {{ dbtvault.multikey(src_pk, prefix=['a','d'], condition='=') }} - WHERE {{ dbtvault.multikey(src_pk, prefix='d', condition='IS NULL') }} + ON {{ automate_dv.multikey(src_pk, prefix=['a','d'], condition='=') }} + WHERE {{ automate_dv.multikey(src_pk, prefix='d', condition='IS NULL') }} {%- endif %} ) diff --git a/macros/tables/duckdb/sat.sql b/macros/tables/duckdb/sat.sql index 461de73d..d6a64198 100644 --- a/macros/tables/duckdb/sat.sql +++ b/macros/tables/duckdb/sat.sql @@ -3,25 +3,25 @@ -{%- set source_cols = dbtvault.expand_column_list(columns=[src_pk, src_hashdiff, src_payload, src_extra_columns, src_eff, src_ldts, src_source]) -%} +{%- set source_cols = automate_dv.expand_column_list(columns=[src_pk, src_hashdiff, src_payload, src_extra_columns, src_eff, src_ldts, src_source]) -%} -{%- set rank_cols = dbtvault.expand_column_list(columns=[src_pk, src_hashdiff, src_ldts]) -%} +{%- set rank_cols = automate_dv.expand_column_list(columns=[src_pk, src_hashdiff, src_ldts]) -%} -{%- set pk_cols = dbtvault.expand_column_list(columns=[src_pk]) -%} +{%- set pk_cols = automate_dv.expand_column_list(columns=[src_pk]) -%} {%- if model.config.materialized == 'vault_insert_by_rank' %} - {%- set source_cols_with_rank = source_cols + dbtvault.escape_column_names([config.get('rank_column')]) -%} + {%- set source_cols_with_rank = source_cols + automate_dv.escape_column_names([config.get('rank_column')]) -%} {%- endif -%} -{{ dbtvault.prepend_generated_by() }} +{{ automate_dv.prepend_generated_by() }} @@ -30,17 +30,17 @@ WITH source_data AS ( {%- if model.config.materialized == 'vault_insert_by_rank' %} - SELECT {{ dbtvault.prefix(source_cols_with_rank, 'a', alias_target='source') }} + SELECT {{ automate_dv.prefix(source_cols_with_rank, 'a', alias_target='source') }} {%- else %} - SELECT {{ dbtvault.prefix(source_cols, 'a', alias_target='source') }} + SELECT {{ automate_dv.prefix(source_cols, 'a', alias_target='source') }} {%- endif %} FROM {{ ref(source_model) }} AS a - WHERE {{ dbtvault.multikey(src_pk, prefix='a', condition='IS NOT NULL') }} + WHERE {{ automate_dv.multikey(src_pk, prefix='a', condition='IS NOT NULL') }} {%- if model.config.materialized == 'vault_insert_by_period' %} @@ -57,24 +57,24 @@ WITH source_data AS ( -{%- if dbtvault.is_any_incremental() %} +{%- if automate_dv.is_any_incremental() %} latest_records AS ( - SELECT {{ dbtvault.prefix(rank_cols, 'a', alias_target='target') }} + SELECT {{ automate_dv.prefix(rank_cols, 'a', alias_target='target') }} FROM ( - SELECT {{ dbtvault.prefix(rank_cols, 'current_records', alias_target='target') }}, + SELECT {{ automate_dv.prefix(rank_cols, 'current_records', alias_target='target') }}, RANK() OVER ( - PARTITION BY {{ dbtvault.prefix([src_pk], 'current_records') }} + PARTITION BY {{ automate_dv.prefix([src_pk], 'current_records') }} - ORDER BY {{ dbtvault.prefix([src_ldts], 'current_records') }} DESC + ORDER BY {{ automate_dv.prefix([src_ldts], 'current_records') }} DESC ) AS rank @@ -82,13 +82,13 @@ latest_records AS ( JOIN ( - SELECT DISTINCT {{ dbtvault.prefix([src_pk], 'source_data') }} + SELECT DISTINCT {{ automate_dv.prefix([src_pk], 'source_data') }} FROM source_data ) AS source_records - ON {{ dbtvault.multikey(src_pk, prefix=['current_records','source_records'], condition='=') }} + ON {{ automate_dv.multikey(src_pk, prefix=['current_records','source_records'], condition='=') }} ) AS a @@ -106,19 +106,19 @@ latest_records AS ( records_to_insert AS ( - SELECT DISTINCT {{ dbtvault.alias_all(source_cols, 'stage') }} + SELECT DISTINCT {{ automate_dv.alias_all(source_cols, 'stage') }} FROM source_data AS stage - {%- if dbtvault.is_any_incremental() %} + {%- if automate_dv.is_any_incremental() %} LEFT JOIN latest_records - ON {{ dbtvault.multikey(src_pk, prefix=['latest_records','stage'], condition='=') }} + ON {{ automate_dv.multikey(src_pk, prefix=['latest_records','stage'], condition='=') }} - WHERE {{ dbtvault.prefix([src_hashdiff], 'latest_records', alias_target='target') }} != {{ dbtvault.prefix([src_hashdiff], 'stage') }} + WHERE {{ automate_dv.prefix([src_hashdiff], 'latest_records', alias_target='target') }} != {{ automate_dv.prefix([src_hashdiff], 'stage') }} - OR {{ dbtvault.prefix([src_hashdiff], 'latest_records', alias_target='target') }} IS NULL + OR {{ automate_dv.prefix([src_hashdiff], 'latest_records', alias_target='target') }} IS NULL {%- endif %} From d7aa562580e37a7f751bc0ec49f1d4e416031ef2 Mon Sep 17 00:00:00 2001 From: Palak Mahajan <130cpalak@gmail.com> Date: Mon, 26 Aug 2024 11:31:05 +0530 Subject: [PATCH 7/7] Added fix to use derived columns --- macros/internal/metadata_processing/escape_column_names.sql | 6 +----- .../internal/metadata_processing/get_escape_characters.sql | 3 +++ 2 files changed, 4 insertions(+), 5 deletions(-) diff --git a/macros/internal/metadata_processing/escape_column_names.sql b/macros/internal/metadata_processing/escape_column_names.sql index 03be2207..33069451 100644 --- a/macros/internal/metadata_processing/escape_column_names.sql +++ b/macros/internal/metadata_processing/escape_column_names.sql @@ -97,8 +97,4 @@ {%- endif -%} -{%- endmacro -%} - -{%- macro duckdb__get_escape_characters() %} - {%- do return (('', '')) -%} -{%- endmacro %} \ No newline at end of file +{%- endmacro -%} \ No newline at end of file diff --git a/macros/internal/metadata_processing/get_escape_characters.sql b/macros/internal/metadata_processing/get_escape_characters.sql index 490a8677..b5d89fea 100644 --- a/macros/internal/metadata_processing/get_escape_characters.sql +++ b/macros/internal/metadata_processing/get_escape_characters.sql @@ -52,3 +52,6 @@ {%- do return (('"', '"')) -%} {%- endmacro %} +{%- macro duckdb__get_escape_characters() %} + {%- do return (('"', '"')) -%} +{%- endmacro %} \ No newline at end of file