Skip to content
4 changes: 4 additions & 0 deletions macros/internal/metadata_processing/get_escape_characters.sql
Original file line number Diff line number Diff line change
Expand Up @@ -51,3 +51,7 @@
{%- macro postgres__get_escape_characters() %}
{%- do return (('"', '"')) -%}
{%- endmacro %}

{%- macro duckdb__get_escape_characters() %}
{%- do return (('"', '"')) -%}
{%- endmacro %}
16 changes: 16 additions & 0 deletions macros/materialisations/period_mat_helpers/get_period_of_load.sql
Original file line number Diff line number Diff line change
Expand Up @@ -81,3 +81,19 @@

{% do return(period_of_load) %}
{%- endmacro -%}


{%- macro duckdb__get_period_of_load(period, offset, start_timestamp) -%}
{# Postgres uses different DateTime arithmetic #}
{% set period_of_load_sql -%}
SELECT DATE_TRUNC('{{ period }}',
TO_TIMESTAMP('{{ start_timestamp }}', 'YYYY-MM-DD HH24:MI:SS') + interval '{{ offset }} {{ period }}'
) AS period_of_load
{%- endset %}

{% set period_of_load_dict = dbtvault.get_query_results_as_dict(period_of_load_sql) %}

{% set period_of_load = period_of_load_dict['PERIOD_OF_LOAD'][0] | string %}

{% do return(period_of_load) %}
{%- endmacro -%}
9 changes: 9 additions & 0 deletions macros/staging/null_columns.sql
Original file line number Diff line number Diff line change
Expand Up @@ -85,4 +85,13 @@
{{ col_name }} AS {{ col_name ~ "_ORIGINAL" }},
COALESCE({{ col_name }}, '{{ default_value }}') AS {{ col_name }}

{%- endmacro -%}

{%- macro duckdb__null_column_sql(col_name, default_value) -%}

{%- set col_name_esc = dbtvault.escape_column_names(col_name) -%}
{%- set col_name_orig_esc = dbtvault.escape_column_names(col_name ~ "_ORIGINAL") -%}
{{ col_name_esc }} AS {{ col_name_orig_esc }},
COALESCE({{ col_name_esc }}, '{{ default_value }}') AS {{ col_name_esc }}

{%- endmacro -%}
8 changes: 7 additions & 1 deletion macros/supporting/casting/cast_date.sql
Original file line number Diff line number Diff line change
Expand Up @@ -68,4 +68,10 @@

{%- if alias %} AS {{ alias }} {%- endif %}

{%- endmacro -%}
{%- endmacro -%}

{%- macro duckdb__cast_date(column_str, as_string=false, datetime=false, alias=none) -%}

{{ dbtvault.snowflake__cast_date(column_str=column_str, as_string=as_string, datetime=datetime, alias=alias)}}

{%- endmacro -%}
10 changes: 9 additions & 1 deletion macros/supporting/casting/cast_datetime.sql
Original file line number Diff line number Diff line change
Expand Up @@ -66,4 +66,12 @@

{%- if alias %} AS {{ alias }} {%- endif %}

{%- endmacro -%}
{%- endmacro -%}

{%- macro duckdb__cast_datetime(column_str, as_string=false, alias=none, date_type=none) -%}

to_char(timestamp {{ column_str }}, 'YYYY-MM-DD HH24:MI:SS.MS')::timestamp

{%- if alias %} AS {{ alias }} {%- endif %}

{%- endmacro -%}
88 changes: 88 additions & 0 deletions macros/supporting/hash.sql
Original file line number Diff line number Diff line change
Expand Up @@ -118,4 +118,92 @@

{{ automate_dv.default__hash(columns=columns, alias=alias, is_hashdiff=is_hashdiff, columns_to_escape=columns_to_escape) }}

{%- endmacro -%}

{%- macro duckdb__hash(columns, alias, is_hashdiff, columns_to_escape=none) -%}

{%- set hash = var('hash', 'MD5') -%}
{%- set concat_string = var('concat_string', '||') -%}
{%- set null_placeholder_string = var('null_placeholder_string', '^^') -%}

{#- Select hashing algorithm -#}
{%- if hash == 'MD5' -%}
{%- set hash_alg = 'MD5' -%}
{%- elif hash == 'SHA' -%}
{%- set hash_alg = 'SHA256' -%}
{%- else -%}
{%- set hash_alg = 'MD5' -%}
{%- endif -%}

{#- Select hashing expression (left and right sides) -#}
{#- * MD5 is simple function call to md5(val) -#}
{#- * SHA256 needs input cast to BYTEA and then its BYTEA result encoded as hex text output -#}
{#- e.g. ENCODE(SHA256(CAST(val AS BYTEA)), 'hex') -#}
{#- Ref: https://www.postgresql.org/docs/11/functions-binarystring.html -#}
{%- if hash_alg == 'MD5' -%}
{%- set hash_expr_left = 'MD5(' -%}
{%- set hash_expr_right = ')' -%}
{%- elif hash_alg == 'SHA256' -%}
{%- set hash_expr_left = 'ENCODE(SHA256(CAST(' -%}
{%- set hash_expr_right = " AS BYTEA)), 'hex')" -%}
{%- endif -%}

{%- set standardise = "NULLIF(UPPER(TRIM(CAST([EXPRESSION] AS VARCHAR))), '')" -%}

{#- Alpha sort columns before hashing if a hashdiff -#}
{%- if is_hashdiff and automate_dv.is_list(columns) -%}
{%- set columns = columns|sort -%}
{%- endif -%}

{#- If single column to hash -#}
{%- if columns is string -%}
{%- set column_str = automate_dv.as_constant(columns) -%}
{%- if automate_dv.is_expression(column_str) -%}
{%- set escaped_column_str = column_str -%}
{%- else -%}
{%- set escaped_column_str = automate_dv.escape_column_names(column_str) -%}
{%- endif -%}

{{- "CAST(UPPER({}{}{}) AS BYTEA) AS {}".format(hash_expr_left, standardise | replace('[EXPRESSION]', escaped_column_str), hash_expr_right, automate_dv.escape_column_names(alias)) | indent(4) -}}

{#- Else a list of columns to hash -#}
{%- else -%}
{%- set all_null = [] -%}

{%- if is_hashdiff -%}
{{- "CAST(UPPER({}CONCAT_WS('{}',".format(hash_expr_left, concat_string) | indent(4) -}}
{%- else -%}
{{- "CAST(UPPER({}NULLIF(CONCAT_WS('{}',".format(hash_expr_left, concat_string) | indent(4) -}}
{%- endif -%}

{%- for column in columns -%}

{%- do all_null.append(null_placeholder_string) -%}

{%- set column_str = automate_dv.as_constant(column) -%}
{%- if automate_dv.is_expression(column_str) -%}
{%- set escaped_column_str = column_str -%}
{%- else -%}
{%- set escaped_column_str = automate_dv.escape_column_names(column_str) -%}
{%- endif -%}

{{- "\nCOALESCE({}, '{}')".format(standardise | replace('[EXPRESSION]', escaped_column_str), null_placeholder_string) | indent(4) -}}
{{- "," if not loop.last -}}

{%- if loop.last -%}

{% if is_hashdiff %}
{{- "\n){}) AS BYTEA) AS {}".format(hash_expr_right, automate_dv.escape_column_names(alias)) -}}
{%- else -%}
{{- "\n), '{}'){}) AS BYTEA) AS {}".format(all_null | join(""), hash_expr_right, automate_dv.escape_column_names(alias)) -}}
{%- endif -%}
{%- else -%}

{%- do all_null.append(concat_string) -%}

{%- endif -%}
{%- endfor -%}

{%- endif -%}

{%- endmacro -%}
89 changes: 89 additions & 0 deletions macros/tables/duckdb/hub.sql
Original file line number Diff line number Diff line change
@@ -0,0 +1,89 @@
{%- macro duckdb__hub(src_pk, src_nk, src_extra_columns, src_ldts, src_source, source_model) -%}

{%- set source_cols = automate_dv.expand_column_list(columns=[src_pk, src_nk, src_extra_columns, src_ldts, src_source]) -%}

{%- if model.config.materialized == 'vault_insert_by_rank' %}
{%- set source_cols_with_rank = source_cols + automate_dv.escape_column_names([config.get('rank_column')]) -%}
{%- endif -%}

{{ automate_dv.prepend_generated_by() }}

{{ 'WITH ' -}}

{%- if not (source_model is iterable and source_model is not string) -%}
{%- set source_model = [source_model] -%}
{%- endif -%}

{%- set ns = namespace(last_cte= "") -%}

{%- for src in source_model -%}

{%- set source_number = loop.index | string -%}

row_rank_{{ source_number }} AS (
{#- PostgreSQL has DISTINCT ON which should be more performant than the
strategy used by Snowflake ROW_NUMBER() OVER( PARTITION BY ...
-#}
{%- if model.config.materialized == 'vault_insert_by_rank' %}
SELECT DISTINCT ON ({{ automate_dv.prefix([src_pk], 'rr') }}) {{ automate_dv.prefix(source_cols_with_rank, 'rr') }}
{%- else %}
SELECT DISTINCT ON ({{ automate_dv.prefix([src_pk], 'rr') }}) {{ automate_dv.prefix(source_cols, 'rr') }}
{%- endif %}
FROM {{ ref(src) }} AS rr
WHERE {{ automate_dv.multikey(src_pk, prefix='rr', condition='IS NOT NULL') }}
ORDER BY {{ automate_dv.prefix([src_pk], 'rr') }}, {{ automate_dv.prefix([src_ldts], 'rr') }}
{%- set ns.last_cte = "row_rank_{}".format(source_number) %}
),{{ "\n" if not loop.last }}
{% endfor -%}
{% if source_model | length > 1 %}
stage_union AS (
{%- for src in source_model %}
SELECT * FROM row_rank_{{ loop.index | string }}
{%- if not loop.last %}
UNION ALL
{%- endif %}
{%- endfor %}
{%- set ns.last_cte = "stage_union" %}
),
{%- endif -%}
{%- if model.config.materialized == 'vault_insert_by_period' %}
stage_mat_filter AS (
SELECT *
FROM {{ ns.last_cte }}
WHERE __PERIOD_FILTER__
{%- set ns.last_cte = "stage_mat_filter" %}
),
{%- elif model.config.materialized == 'vault_insert_by_rank' %}
stage_mat_filter AS (
SELECT *
FROM {{ ns.last_cte }}
WHERE __RANK_FILTER__
{%- set ns.last_cte = "stage_mat_filter" %}
),
{%- endif -%}
{%- if source_model | length > 1 %}

row_rank_union AS (
{#- PostgreSQL has DISTINCT ON which should be more performant than the
strategy used by Snowflake ROW_NUMBER() OVER( PARTITION BY ...
-#}
SELECT DISTINCT ON ({{ automate_dv.prefix([src_pk], 'ru') }}) ru.*
FROM {{ ns.last_cte }} AS ru
WHERE {{ automate_dv.multikey(src_pk, prefix='ru', condition='IS NOT NULL') }}
ORDER BY {{ automate_dv.prefix([src_pk], 'ru') }}, {{ automate_dv.prefix([src_ldts], 'ru') }}, {{ automate_dv.prefix([src_source], 'ru') }} ASC
{%- set ns.last_cte = "row_rank_union" %}
),
{% endif %}
records_to_insert AS (
SELECT {{ automate_dv.prefix(source_cols, 'a', alias_target='target') }}
FROM {{ ns.last_cte }} AS a
{%- if automate_dv.is_any_incremental() %}
LEFT JOIN {{ this }} AS d
ON {{ automate_dv.multikey(src_pk, prefix=['a','d'], condition='=') }}
WHERE {{ automate_dv.multikey(src_pk, prefix='d', condition='IS NULL') }}
{%- endif %}
)

SELECT * FROM records_to_insert

{%- endmacro -%}
100 changes: 100 additions & 0 deletions macros/tables/duckdb/link.sql
Original file line number Diff line number Diff line change
@@ -0,0 +1,100 @@
{%- macro duckdb__link(src_pk, src_fk, src_extra_columns, src_ldts, src_source, source_model) -%}

{%- set source_cols = automate_dv.expand_column_list(columns=[src_pk, src_fk, src_extra_columns, src_ldts, src_source]) -%}
{%- set fk_cols = automate_dv.expand_column_list([src_fk]) -%}

{%- if model.config.materialized == 'vault_insert_by_rank' %}
{%- set source_cols_with_rank = source_cols + automate_dv.escape_column_names([config.get('rank_column')]) -%}
{%- endif -%}

{{ automate_dv.prepend_generated_by() }}

{{ 'WITH ' -}}

{%- if not (source_model is iterable and source_model is not string) -%}
{%- set source_model = [source_model] -%}
{%- endif -%}

{%- set ns = namespace(last_cte= "") -%}

{%- for src in source_model -%}

{%- set source_number = loop.index | string -%}

row_rank_{{ source_number }} AS (
SELECT * FROM (
{%- if model.config.materialized == 'vault_insert_by_rank' %}
SELECT {{ automate_dv.prefix(source_cols_with_rank, 'rr') }},
{%- else %}
SELECT {{ automate_dv.prefix(source_cols, 'rr') }},
{%- endif %}
ROW_NUMBER() OVER(
PARTITION BY {{ automate_dv.prefix([src_pk], 'rr') }}
ORDER BY {{ automate_dv.prefix([src_ldts], 'rr') }}
) AS row_number
FROM {{ ref(src) }} AS rr
{%- if source_model | length == 1 %}
WHERE {{ automate_dv.multikey(src_pk, prefix='rr', condition='IS NOT NULL') }}
AND {{ automate_dv.multikey(fk_cols, prefix='rr', condition='IS NOT NULL') }}
{%- endif %}
) as l
WHERE row_number = 1
{%- set ns.last_cte = "row_rank_{}".format(source_number) %}
),{{ "\n" if not loop.last }}
{% endfor -%}
{% if source_model | length > 1 %}
stage_union AS (
{%- for src in source_model %}
SELECT * FROM row_rank_{{ loop.index | string }}
{%- if not loop.last %}
UNION ALL
{%- endif %}
{%- endfor %}
{%- set ns.last_cte = "stage_union" %}
),
{%- endif -%}
{%- if model.config.materialized == 'vault_insert_by_period' %}
stage_mat_filter AS (
SELECT *
FROM {{ ns.last_cte }}
WHERE __PERIOD_FILTER__
{%- set ns.last_cte = "stage_mat_filter" %}
),
{%- elif model.config.materialized == 'vault_insert_by_rank' %}
stage_mat_filter AS (
SELECT *
FROM {{ ns.last_cte }}
WHERE __RANK_FILTER__
{%- set ns.last_cte = "stage_mat_filter" %}
),
{% endif %}
{%- if source_model | length > 1 %}

row_rank_union AS (
SELECT * FROM (
SELECT ru.*,
ROW_NUMBER() OVER(
PARTITION BY {{ automate_dv.prefix([src_pk], 'ru') }}
ORDER BY {{ automate_dv.prefix([src_ldts], 'ru') }}, {{ automate_dv.prefix([src_source], 'ru') }} ASC
) AS row_rank_number
FROM {{ ns.last_cte }} AS ru
WHERE {{ automate_dv.multikey(src_pk, prefix='ru', condition='IS NOT NULL') }}
AND {{ automate_dv.multikey(fk_cols, prefix='ru', condition='IS NOT NULL') }}
) AS a
WHERE row_rank_number = 1
{%- set ns.last_cte = "row_rank_union" %}
),
{% endif %}
records_to_insert AS (
SELECT {{ automate_dv.prefix(source_cols, 'a', alias_target='target') }}
FROM {{ ns.last_cte }} AS a
{%- if automate_dv.is_any_incremental() %}
LEFT JOIN {{ this }} AS d
ON {{ automate_dv.multikey(src_pk, prefix=['a','d'], condition='=') }}
WHERE {{ automate_dv.multikey(src_pk, prefix='d', condition='IS NULL') }}
{%- endif %}
)

SELECT * FROM records_to_insert

{%- endmacro -%}
Loading