Skip to content
Open
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
9 changes: 9 additions & 0 deletions macros/internal/helpers/logging/log_relation_sources.sql
Original file line number Diff line number Diff line change
Expand Up @@ -18,6 +18,15 @@

{% macro databricks__log_relation_sources(relation, source_count) %}

{%- if execute and automate_dv.is_something(invocation_args_dict.get('which')) and invocation_args_dict.get('which') != 'docs' -%}

{%- do dbt_utils.log_info('Loading {} from {} source(s)'.format("{}.{}".format(relation.schema, relation.identifier),
source_count)) -%}
{%- endif -%}
{% endmacro %}

{% macro spark__log_relation_sources(relation, source_count) %}

{%- if execute and automate_dv.is_something(invocation_args_dict.get('which')) and invocation_args_dict.get('which') != 'docs' -%}

{%- do dbt_utils.log_info('Loading {} from {} source(s)'.format("{}.{}".format(relation.schema, relation.identifier),
Expand Down
6 changes: 6 additions & 0 deletions macros/internal/metadata_processing/concat_ws.sql
Original file line number Diff line number Diff line change
Expand Up @@ -47,3 +47,9 @@ CONCAT(
{{ automate_dv.default__concat_ws(string_list=string_list, separator=separator) }}

{%- endmacro -%}

{%- macro spark__concat_ws(string_list, separator="||") -%}

{{ automate_dv.default__concat_ws(string_list=string_list, separator=separator) }}

{%- endmacro -%}
4 changes: 4 additions & 0 deletions macros/internal/metadata_processing/get_escape_characters.sql
Original file line number Diff line number Diff line change
Expand Up @@ -48,6 +48,10 @@
{%- do return (('`', '`')) -%}
{%- endmacro %}

{%- macro spark__get_escape_characters() %}
{%- do return (('`', '`')) -%}
{%- endmacro %}

{%- macro postgres__get_escape_characters() %}
{%- do return (('"', '"')) -%}
{%- endmacro %}
4 changes: 2 additions & 2 deletions macros/materialisations/drop_temporary.sql
Original file line number Diff line number Diff line change
Expand Up @@ -4,12 +4,12 @@
*/

{% macro drop_temporary_special(tmp_relation) %}
{# In databricks and sqlserver a temporary view/table can only be dropped by #}
{# In spark, databricks and sqlserver a temporary view/table can only be dropped by #}
{# the connection or session that created it so drop it now before the commit below closes this session #}

{%- set drop_query_name = 'DROP_QUERY-' ~ i -%}
{% call statement(drop_query_name, fetch_result=True) -%}
{% if target.type == 'databricks' %}
{% if target.type in ['databricks', 'spark'] %}
DROP VIEW {{ tmp_relation }};
{% elif target.type == 'sqlserver' %}
DROP TABLE {{ tmp_relation }};
Expand Down
22 changes: 22 additions & 0 deletions macros/materialisations/incremental_pit_bridge_replace.sql
Original file line number Diff line number Diff line change
Expand Up @@ -36,6 +36,17 @@
{%- endmacro %}


{% macro spark__incremental_pit_replace(tmp_relation, target_relation, statement_name="main") %}
{%- set dest_columns = adapter.get_columns_in_relation(target_relation) -%}
{%- set dest_cols_csv = dest_columns | map(attribute='quoted') | join(', ') -%}

INSERT OVERWRITE {{ target_relation }} ({{ dest_cols_csv }})
SELECT {{ dest_cols_csv }}
FROM {{ tmp_relation }};

{%- endmacro %}



{% macro incremental_bridge_replace(tmp_relation, target_relation, statement_name="main") %}

Expand Down Expand Up @@ -71,4 +82,15 @@
{%- endmacro %}


{% macro spark__incremental_bridge_replace(tmp_relation, target_relation, statement_name="main") %}
{%- set dest_columns = adapter.get_columns_in_relation(target_relation) -%}
{%- set dest_cols_csv = dest_columns | map(attribute='quoted') | join(', ') -%}

INSERT OVERWRITE {{ target_relation }} ({{ dest_cols_csv }})
SELECT {{ dest_cols_csv }}
FROM {{ tmp_relation }}
;
{%- endmacro %}



Original file line number Diff line number Diff line change
Expand Up @@ -150,6 +150,37 @@
{%- endmacro %}


{% macro spark__get_period_boundaries(target_relation, timestamp_field, start_date, stop_date, period) -%}

{%- set from_date_or_timestamp = "NULLIF('{}','none')::TIMESTAMP".format(stop_date | lower) -%}
{%- set datepart = period -%}
{% set period_boundary_sql -%}

WITH period_data AS (
SELECT
COALESCE(MAX({{ timestamp_field }}), CAST('{{ start_date }}' AS TIMESTAMP)) AS start_timestamp,
COALESCE(
{{ automate_dv.timestamp_add(datepart, interval, from_date_or_timestamp) }},
{{ current_timestamp() }}) AS stop_timestamp
FROM {{ target_relation }}
)
SELECT
IF(stop_timestamp < start_timestamp, stop_timestamp, start_timestamp) AS start_timestamp,
stop_timestamp,
{{ datediff('start_timestamp', 'stop_timestamp', period) }} + 1 AS num_periods

FROM period_data
{%- endset %}

{% set period_boundaries_dict = automate_dv.get_query_results_as_dict(period_boundary_sql) %}

{% set period_boundaries = {'start_timestamp': period_boundaries_dict['START_TIMESTAMP'][0] | string,
'stop_timestamp': period_boundaries_dict['STOP_TIMESTAMP'][0] | string,
'num_periods': period_boundaries_dict['NUM_PERIODS'][0] | int} %}

{% do return(period_boundaries) %}
{%- endmacro %}

{% macro postgres__get_period_boundaries(target_relation, timestamp_field, start_date, stop_date, period) -%}

{% set period_boundary_sql -%}
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -69,6 +69,11 @@
{%- endmacro -%}


{%- macro spark__get_period_of_load(period, offset, start_timestamp) -%}
{% do return(automate_dv.default__get_period_of_load(period=period, offset=offset, start_timestamp=start_timestamp)) %}
{%- endmacro -%}


{%- macro postgres__get_period_of_load(period, offset, start_timestamp) -%}
{# Postgres uses different DateTime arithmetic #}
{% set period_of_load_sql -%}
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -119,7 +119,7 @@

{% if 'response' in result.keys() %} {# added in v0.19.0 #}
{%- if not result['response']['rows_affected'] %}
{% if target.type == "databricks" and result['data'] | length > 0 %}
{% if target.type in ["databricks", "spark"] and result['data'] | length > 0 %}
{% set rows_inserted = result['data'][0][1] | int %}
{% else %}
{% set rows_inserted = 0 %}
Expand All @@ -139,9 +139,9 @@
period_of_load, rows_inserted,
model.unique_id)) }}

{# In databricks and sqlserver a temporary view/table can only be dropped by #}
{# In spark, databricks and sqlserver a temporary view/table can only be dropped by #}
{# the connection or session that created it so drop it now before the commit below closes this session #} model.unique_id)) }}
{% if target.type in ['databricks', 'sqlserver'] %}
{% if target.type in ['databricks', 'sqlserver', 'spark'] %}
{{ automate_dv.drop_temporary_special(tmp_relation) }}
{% else %}
{% do to_drop.append(tmp_relation) %}
Expand All @@ -166,7 +166,7 @@

{% if 'response' in result.keys() %} {# added in v0.19.0 #}
{%- if not result['response']['rows_affected'] %}
{% if target.type == "databricks" and result['data'] | length > 0 %}
{% if target.type in ["databricks", "spark"] and result['data'] | length > 0 %}
{% set rows_inserted = result['data'][0][1] | int %}
{% else %}
{% set rows_inserted = 0 %}
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -95,7 +95,7 @@

{% set result = load_result(insert_query_name) %}
{% if 'response' in result.keys() %} {# added in v0.19.0 #}
{# Investigate for Databricks #}
{# Investigate for Databricks, Spark #}
{%- if result['response']['rows_affected'] == None %}
{% set rows_inserted = 0 %}
{%- else %}
Expand All @@ -114,9 +114,9 @@
rows_inserted,
model.unique_id)) }}

{# In databricks and sqlserver a temporary view/table can only be dropped by #}
{# In spark, databricks and sqlserver a temporary view/table can only be dropped by #}
{# the connection or session that created it so drop it now before the commit below closes this session #} model.unique_id)) }}
{% if target.type in ['databricks', 'sqlserver'] %}
{% if target.type in ['databricks', 'sqlserver', 'spark'] %}
{{ automate_dv.drop_temporary_special(tmp_relation) }}
{% else %}
{% do to_drop.append(tmp_relation) %}
Expand Down
7 changes: 7 additions & 0 deletions macros/supporting/casting/cast_date.sql
Original file line number Diff line number Diff line change
Expand Up @@ -58,6 +58,13 @@
{%- endmacro -%}


{%- macro spark__cast_date(column_str, as_string=false, alias=none) -%}

{{ automate_dv.snowflake__cast_date(column_str=column_str, as_string=as_string, alias=alias)}}

{%- endmacro -%}


{%- macro postgres__cast_date(column_str, as_string=false, alias=none) -%}

{%- if as_string -%}
Expand Down
7 changes: 7 additions & 0 deletions macros/supporting/casting/cast_datetime.sql
Original file line number Diff line number Diff line change
Expand Up @@ -60,6 +60,13 @@
{%- endmacro -%}


{%- macro spark__cast_datetime(column_str, as_string=false, alias=none, date_type=none) -%}

{{ automate_dv.snowflake__cast_datetime(column_str=column_str, as_string=as_string, alias=alias, date_type=date_type)}}

{%- endmacro -%}


{%- macro postgres__cast_datetime(column_str, as_string=false, alias=none, date_type=none) -%}

to_char(timestamp {{ column_str }}, 'YYYY-MM-DD HH24:MI:SS.MS')::timestamp
Expand Down
10 changes: 10 additions & 0 deletions macros/supporting/data_types/type_binary.sql
Original file line number Diff line number Diff line change
Expand Up @@ -43,6 +43,16 @@
{%- macro databricks__type_binary(for_dbt_compare=false) -%}
{%- set enable_native_hashes = var('enable_native_hashes', false) -%}

{%- if not enable_native_hashes -%}
STRING
{%- else -%}
BINARY
{%- endif -%}
{%- endmacro -%}

{%- macro spark__type_binary(for_dbt_compare=false) -%}
{%- set enable_native_hashes = var('enable_native_hashes', false) -%}

{%- if not enable_native_hashes -%}
STRING
{%- else -%}
Expand Down
14 changes: 14 additions & 0 deletions macros/supporting/data_types/type_string.sql
Original file line number Diff line number Diff line change
Expand Up @@ -32,3 +32,17 @@
VARCHAR({{ char_length }})
{%- endif -%}
{%- endmacro -%}

{%- macro spark__type_string(is_hash=false, char_length=255) -%}
{%- if is_hash -%}
{%- if var('hash', 'MD5') | lower == 'md5' -%}
VARCHAR(16)
{%- elif var('hash', 'MD5') | lower == 'sha' -%}
VARCHAR(32)
{%- elif var('hash', 'MD5') | lower == 'sha1' -%}
VARCHAR(20)
{%- endif -%}
{%- else -%}
VARCHAR({{ char_length }})
{%- endif -%}
{%- endmacro -%}
7 changes: 7 additions & 0 deletions macros/supporting/hash.sql
Original file line number Diff line number Diff line change
Expand Up @@ -118,4 +118,11 @@

{{ automate_dv.default__hash(columns=columns, alias=alias, is_hashdiff=is_hashdiff, columns_to_escape=columns_to_escape) }}

{%- endmacro -%}


{%- macro spark__hash(columns, alias, is_hashdiff, columns_to_escape) -%}

{{ automate_dv.default__hash(columns=columns, alias=alias, is_hashdiff=is_hashdiff, columns_to_escape=columns_to_escape) }}

{%- endmacro -%}
36 changes: 36 additions & 0 deletions macros/supporting/hash_components/select_hash_alg.sql
Original file line number Diff line number Diff line change
Expand Up @@ -73,6 +73,18 @@

{% endmacro %}

{% macro spark__hash_alg_md5() -%}

{%- set is_native_hashing = var('enable_native_hashes', false) -%}

{% if is_native_hashing %}
{%- do return('UNHEX(MD5([HASH_STRING_PLACEHOLDER]))') %}
{%- else -%}
{%- do return(automate_dv.cast_binary('UPPER(MD5([HASH_STRING_PLACEHOLDER]))', quote=false)) -%}
{%- endif -%}

{% endmacro %}


{#- SHA256 -#}

Expand Down Expand Up @@ -129,6 +141,18 @@

{% endmacro %}

{% macro spark__hash_alg_sha256() -%}

{%- set is_native_hashing = var('enable_native_hashes', false) -%}

{% if is_native_hashing %}
{%- do return('UNHEX(SHA2([HASH_STRING_PLACEHOLDER], 256))') %}
{%- else -%}
{%- do return(automate_dv.cast_binary('UPPER(SHA2([HASH_STRING_PLACEHOLDER], 256))', quote=false)) -%}
{%- endif -%}

{% endmacro %}

{#- SHA1 -#}

{%- macro hash_alg_sha1() -%}
Expand Down Expand Up @@ -180,3 +204,15 @@
{%- endif -%}

{% endmacro %}

{% macro spark__hash_alg_sha1() -%}

{%- set is_native_hashing = var('enable_native_hashes', false) -%}

{% if is_native_hashing %}
{%- do return('UNHEX(SHA1([HASH_STRING_PLACEHOLDER]))') %}
{%- else -%}
{%- do return(automate_dv.cast_binary('UPPER(SHA1([HASH_STRING_PLACEHOLDER]))', quote=false)) -%}
{%- endif -%}

{% endmacro %}
17 changes: 17 additions & 0 deletions macros/supporting/hash_components/standard_column_wrapper.sql
Original file line number Diff line number Diff line change
Expand Up @@ -53,6 +53,23 @@
{%- endmacro -%}


{%- macro spark__standard_column_wrapper(hash_content_casing) -%}

{%- if hash_content_casing == 'upper' -%}
{%- set standardise -%}
NULLIF(UPPER(TRIM(CAST([EXPRESSION] AS {{ automate_dv.type_string(is_hash=true) }}))), '')
{%- endset -%}
{%- else -%}
{%- set standardise -%}
NULLIF(TRIM(CAST([EXPRESSION] AS {{ automate_dv.type_string(is_hash=true) }})), '')
{%- endset -%}
{%- endif -%}

{% do return(standardise) -%}

{%- endmacro -%}


{%- macro sqlserver__standard_column_wrapper(hash_content_casing) -%}

{%- if hash_content_casing == 'upper' -%}
Expand Down
16 changes: 16 additions & 0 deletions macros/tables/spark/bridge.sql
Original file line number Diff line number Diff line change
@@ -0,0 +1,16 @@
/*
* Copyright (c) Business Thinking Ltd. 2019-2025
* This software includes code developed by the AutomateDV (f.k.a dbtvault) Team at Business Thinking Ltd. Trading as Datavault
*/

{%- macro spark__bridge(src_pk, src_extra_columns, as_of_dates_table, bridge_walk, stage_tables_ldts, src_ldts, source_model) -%}

{{- automate_dv.default__bridge(src_pk=src_pk,
src_extra_columns=src_extra_columns,
src_ldts=src_ldts,
as_of_dates_table=as_of_dates_table,
bridge_walk=bridge_walk,
stage_tables_ldts=stage_tables_ldts,
source_model=source_model) -}}

{%- endmacro -%}
14 changes: 14 additions & 0 deletions macros/tables/spark/eff_sat.sql
Original file line number Diff line number Diff line change
@@ -0,0 +1,14 @@
/*
* Copyright (c) Business Thinking Ltd. 2019-2025
* This software includes code developed by the AutomateDV (f.k.a dbtvault) Team at Business Thinking Ltd. Trading as Datavault
*/

{%- macro spark__eff_sat(src_pk, src_dfk, src_sfk, src_extra_columns, src_start_date, src_end_date, src_eff, src_ldts, src_source, source_model) -%}

{{- automate_dv.default__eff_sat(src_pk=src_pk, src_dfk=src_dfk, src_sfk=src_sfk,
src_extra_columns=src_extra_columns,
src_start_date=src_start_date, src_end_date=src_end_date,
src_eff=src_eff, src_ldts=src_ldts, src_source=src_source,
source_model=source_model) -}}

{%- endmacro -%}
Loading