Skip to content
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension


Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
9 changes: 9 additions & 0 deletions dlt/common/destination/client.py
Original file line number Diff line number Diff line change
Expand Up @@ -44,6 +44,7 @@
C_DLT_LOAD_ID,
TLoaderReplaceStrategy,
TTableFormat,
TTableSchema,
)
from dlt.common.destination.capabilities import DestinationCapabilitiesContext
from dlt.common.destination.exceptions import (
Expand Down Expand Up @@ -686,6 +687,14 @@ def should_truncate_table_before_load_on_staging_destination(self, table_name: s
"""
pass

def should_drop_table_on_staging_destination(self, dropped_table: TTableSchema) -> bool:
"""Tells if `dropped_table` should be dropped on staging destination (regular dataset) in addition to dropping the table on
final destination. This stays False for all the destinations except Athena, non-iceberg where staging destination
holds actual data which needs to be deleted.
Note that `dropped_table` may not longer be present in schema. It is present only if it got recreated.
"""
return False


class SupportsOpenTables(ABC):
"""Provides access to data stored in one of open table formats (iceberg or delta) and intended to
Expand Down
14 changes: 14 additions & 0 deletions dlt/destinations/impl/athena/athena.py
Original file line number Diff line number Diff line change
Expand Up @@ -11,6 +11,8 @@
TYPE_CHECKING,
)

from dlt.common.schema.exceptions import TableNotFound

if TYPE_CHECKING:
from mypy_boto3_lakeformation import LakeFormationClient
from mypy_boto3_lakeformation.type_defs import (
Expand All @@ -35,6 +37,7 @@
TColumnType,
TSchemaTables,
TSortOrder,
TTableSchema,
)
from dlt.common.destination import DestinationCapabilitiesContext, PreparedTableSchema
from dlt.common.destination.client import FollowupJobRequest, SupportsStagingDestination, LoadJob
Expand Down Expand Up @@ -345,6 +348,7 @@ def update_stored_schema(
if (
self.config.lakeformation_config is not None
and self.config.lakeformation_config.enabled
is not None # both True and False are actionable
):
self.manage_lf_tags()
return applied_update
Expand Down Expand Up @@ -406,6 +410,16 @@ def should_truncate_table_before_load_on_staging_destination(self, table_name: s
return True
return False

def should_drop_table_on_staging_destination(self, dropped_table: TTableSchema) -> bool:
# in Athena we must drop table in glue and then we must drop data in staging if table is not iceberg
try:
existing_table = self.prepare_load_table(dropped_table["name"])
# do not drop data if new iceberg table got created - storage is handled by Athena
return not self._is_iceberg_table(existing_table)
except TableNotFound:
# table got dropped and is not recreated - drop staging destination
return True

def should_load_data_to_staging_dataset_on_staging_destination(self, table_name: str) -> bool:
"""iceberg table data goes into staging on staging destination"""
table = self.prepare_load_table(table_name)
Expand Down
2 changes: 1 addition & 1 deletion dlt/destinations/impl/athena/configuration.py
Original file line number Diff line number Diff line change
Expand Up @@ -8,7 +8,7 @@

@configspec
class LakeformationConfig:
enabled: bool = False
enabled: Optional[bool] = None
tags: Optional[Dict[str, str]] = None


Expand Down
13 changes: 9 additions & 4 deletions dlt/destinations/impl/databricks/databricks.py
Original file line number Diff line number Diff line change
Expand Up @@ -72,10 +72,15 @@ def run(self) -> None:
# decide if this is a local file or a staged file
is_local_file = not ReferenceFollowupJobRequest.is_reference_job(self._file_path)
if is_local_file:
# conn parameter staging_allowed_local_path must be set to use 'PUT/REMOVE volume_path' SQL statement
self._sql_client.native_connection.thrift_backend.staging_allowed_local_path = (
os.path.dirname(self._file_path)
)
# staging_allowed_local_path should be set when opening the connection but at that
# time we do not know this path so do it now
conn_ = self._sql_client.native_connection
file_dir = os.path.dirname(self._file_path)
if backend := getattr(conn_, "thrift_backend", None):
backend.staging_allowed_local_path = file_dir
else:
# thrift backend discontinued on newer databricks connector clients
conn_.staging_allowed_local_path = file_dir
# local file by uploading to a temporary volume on Databricks
from_clause, file_name, volume_path, volume_file_path = self._handle_local_file_upload(
self._file_path
Expand Down
3 changes: 3 additions & 0 deletions dlt/destinations/impl/dummy/dummy.py
Original file line number Diff line number Diff line change
Expand Up @@ -148,6 +148,9 @@ def is_storage_initialized(self) -> bool:
def drop_storage(self) -> None:
pass

def drop_tables(self, *tables: str, delete_schema: bool = True) -> None:
pass

def update_stored_schema(
self,
only_tables: Iterable[str] = None,
Expand Down
5 changes: 4 additions & 1 deletion dlt/load/load.py
Original file line number Diff line number Diff line change
Expand Up @@ -539,6 +539,7 @@ def initialize_package(
if isinstance(job_client, WithStagingDataset)
else None
),
lambda table_name: True, # drop all passed tables
drop_tables=dropped_tables,
truncate_tables=truncated_tables,
)
Expand All @@ -556,9 +557,11 @@ def initialize_package(
schema,
new_jobs,
expected_update,
job_client.should_truncate_table_before_load_on_staging_destination,
# should_truncate_staging,
job_client.should_truncate_table_before_load_on_staging_destination,
job_client.should_load_data_to_staging_dataset_on_staging_destination,
# should we drop tables also on staging destination
job_client.should_drop_table_on_staging_destination,
drop_tables=dropped_tables,
truncate_tables=truncated_tables,
)
Expand Down
15 changes: 11 additions & 4 deletions dlt/load/utils.py
Original file line number Diff line number Diff line change
Expand Up @@ -69,6 +69,7 @@ def init_client(
expected_update: TSchemaTables,
truncate_filter: Callable[[str], bool],
load_staging_filter: Callable[[str], bool],
drop_staging_filter: Callable[[TTableSchema], bool],
drop_tables: Optional[List[TTableSchema]] = None,
truncate_tables: Optional[List[TTableSchema]] = None,
) -> TSchemaTables:
Expand All @@ -81,8 +82,9 @@ def init_client(
schema (Schema): The schema as in load package
new_jobs (Iterable[LoadJobInfo]): List of new jobs
expected_update (TSchemaTables): Schema update as in load package. Always present even if empty
truncate_filter (Callable[[str], bool]): A filter that tells which table in destination dataset should be truncated
load_staging_filter (Callable[[str], bool]): A filter which tell which table in the staging dataset may be loaded into
truncate_filter (Callable[[str], bool]): A filter that tells if table should be truncated
load_staging_filter (Callable[[str], bool]): A filter which tell if table may be loaded into
drop_staging_filter (Callable[[str], bool]): A filter which tell if table may be dropped
drop_tables (Optional[List[TTableSchema]]): List of tables to drop before initializing storage
truncate_tables (Optional[List[TTableSchema]]): List of tables to truncate before initializing storage

Expand Down Expand Up @@ -111,8 +113,13 @@ def init_client(
)
)

# get tables to drop
drop_table_names = {table["name"] for table in drop_tables} if drop_tables else set()
# get tables to drop, note that drop_tables are not in schema and come from the package
# state
drop_table_names = (
{table["name"] for table in drop_tables if drop_staging_filter(table)}
if drop_tables
else set()
)
job_client.verify_schema(only_tables=tables_with_jobs | dlt_tables, new_jobs=new_jobs)
applied_update = _init_dataset_and_update_schema(
job_client,
Expand Down
2 changes: 1 addition & 1 deletion pyproject.toml
Original file line number Diff line number Diff line change
Expand Up @@ -142,7 +142,7 @@ qdrant = [
"qdrant-client[fastembed]>=1.8"
]
databricks = [
"databricks-sql-connector>=2.9.3,<4 ; python_version <= '3.12'",
"databricks-sql-connector>=2.9.3 ; python_version <= '3.12'",
"databricks-sql-connector>=3.6.0 ; python_version >= '3.13'",
"databricks-sdk>=0.38.0",
]
Expand Down
4 changes: 3 additions & 1 deletion tests/load/athena_iceberg/test_lakeformation.py
Original file line number Diff line number Diff line change
Expand Up @@ -186,7 +186,9 @@ def create_pipelines(
)
lf_disabled_pipeline = destination_config.setup_pipeline(
pipeline_name,
destination=destination_config.destination_factory(),
destination=destination_config.destination_factory(
lakeformation_config=LakeformationConfig(enabled=False)
),
dataset_name=dataset_name,
staging=staging_destination,
)
Expand Down
10 changes: 7 additions & 3 deletions tests/load/pipeline/test_athena.py
Original file line number Diff line number Diff line change
Expand Up @@ -35,12 +35,15 @@
),
ids=lambda x: x.name,
)
@pytest.mark.parametrize("lf_enabled", [True, False], ids=["lf-on", "lf-off"])
@pytest.mark.parametrize(
"lf_enabled", [True, False, None], ids=["lf-on", "lf-off", "lf-passthrough"]
)
def test_athena_lakeformation_config_gating(
destination_config: DestinationTestConfiguration, lf_enabled: bool, mocker, monkeypatch
) -> None:
# Configure Lake Formation gating via env (read by client config)
monkeypatch.setenv("DESTINATION__LAKEFORMATION_CONFIG__ENABLED", str(lf_enabled))
if lf_enabled is not None:
monkeypatch.setenv("DESTINATION__LAKEFORMATION_CONFIG__ENABLED", str(lf_enabled))

pipeline = destination_config.setup_pipeline("athena_" + uniq_id(), dev_mode=True)

Expand All @@ -55,7 +58,8 @@ def test_athena_lakeformation_config_gating(
)

client.update_stored_schema()
if lf_enabled:
# disable and enable flag with add / remove tags respectively, None will skip
if lf_enabled is not None:
mocked_manage.assert_called()
else:
mocked_manage.assert_not_called()
Expand Down
51 changes: 30 additions & 21 deletions tests/load/pipeline/test_refresh_modes.py
Original file line number Diff line number Diff line change
Expand Up @@ -99,7 +99,7 @@ def some_data_4():
"destination_config",
destinations_configs(
default_sql_configs=True,
subset=["duckdb", "filesystem", "iceberg"],
subset=["duckdb", "filesystem", "iceberg", "athena"],
local_filesystem_configs=True,
table_format_local_configs=True,
),
Expand All @@ -110,20 +110,23 @@ def some_data_4():
def test_refresh_drop_sources(
destination_config: DestinationTestConfiguration, in_source: bool, with_wipe: bool
):
pipeline = destination_config.setup_pipeline("refresh_source")
pipeline_name = "refresh_source"
dataset_name = pipeline_name + uniq_id()
pipeline = destination_config.setup_pipeline(pipeline_name, dataset_name=dataset_name)

data: Any = refresh_source(first_run=True, drop_sources=True)
if not in_source:
data = list(data.selected_resources.values())

# First run pipeline so destination so tables are created
# first run pipeline so destination so tables are created
info = pipeline.run(data, refresh="drop_sources", **destination_config.run_kwargs)
assert_load_info(info)
assert table_exists(pipeline, "some_data_3")

# Second run of pipeline with only selected resources
# second run of pipeline with only selected resources
if with_wipe:
pipeline._wipe_working_folder()
pipeline = destination_config.setup_pipeline("refresh_source")
pipeline = destination_config.setup_pipeline(pipeline_name, dataset_name=dataset_name)

data = refresh_source(first_run=False, drop_sources=True).with_resources(
"some_data_1", "some_data_2"
Expand All @@ -142,16 +145,16 @@ def test_refresh_drop_sources(
"some_data_2",
}

# No "name" column should exist as table was dropped and re-created without it
# no "name" column should exist as table was dropped and re-created without it
assert_only_table_columns(pipeline, "some_data_1", ["id"])
data = load_tables_to_dicts(pipeline, "some_data_1")["some_data_1"]
result = sorted([row["id"] for row in data])
# Only rows from second run should exist
# only rows from second run should exist
assert result == [3, 4]

# Confirm resource tables not selected on second run got dropped
# confirm resource tables not selected on second run got dropped
assert not table_exists(pipeline, "some_data_3")
# Loaded state is wiped
# loaded state is wiped
with pipeline.destination_client() as dest_client:
destination_state = load_pipeline_state_from_destination(
pipeline.pipeline_name, dest_client # type: ignore[arg-type]
Expand All @@ -173,7 +176,9 @@ def test_existing_schema_hash(destination_config: DestinationTestConfiguration):
"""Test when new schema is identical to a previously stored schema after dropping and re-creating tables.
The change should be detected regardless and tables are created again in destination db
"""
pipeline = destination_config.setup_pipeline("refresh_full_test", refresh="drop_sources")
pipeline = destination_config.setup_pipeline(
"refresh_full_test", refresh="drop_sources", dev_mode=True
)

info = pipeline.run(
refresh_source(first_run=True, drop_sources=True), **destination_config.run_kwargs
Expand Down Expand Up @@ -210,15 +215,13 @@ def test_existing_schema_hash(destination_config: DestinationTestConfiguration):
assert new_schema_hash == first_schema_hash


pytest.mark.essential


@pytest.mark.essential
@pytest.mark.parametrize(
"destination_config",
destinations_configs(
default_sql_configs=True,
local_filesystem_configs=True,
subset=["duckdb", "filesystem", "iceberg"],
subset=["duckdb", "filesystem", "iceberg", "athena"],
table_format_local_configs=True,
),
ids=lambda x: x.name,
Expand All @@ -232,19 +235,21 @@ def test_refresh_drop_resources(
pytest.skip("not needed")

# First run pipeline with load to destination so tables are created
pipeline = destination_config.setup_pipeline("refresh_source")
pipeline_name = "refresh_source"
dataset_name = pipeline_name + uniq_id()
pipeline = destination_config.setup_pipeline(pipeline_name, dataset_name=dataset_name)

data: Any = refresh_source(first_run=True)
if not in_source:
data = list(data.selected_resources.values())

info = pipeline.run(data, refresh="drop_resources", **destination_config.run_kwargs)
info = pipeline.run(data, **destination_config.run_kwargs)
assert_load_info(info)

# Second run of pipeline with only selected resources
if with_wipe:
pipeline._wipe_working_folder()
pipeline = destination_config.setup_pipeline("refresh_source")
pipeline = destination_config.setup_pipeline(pipeline_name, dataset_name=dataset_name)

data = refresh_source(first_run=False).with_resources("some_data_1", "some_data_2")
if not in_source:
Expand Down Expand Up @@ -298,7 +303,9 @@ def test_refresh_drop_resources(
def test_refresh_drop_data_only(destination_config: DestinationTestConfiguration):
"""Refresh drop_data should truncate all selected tables before load"""
# First run pipeline with load to destination so tables are created
pipeline = destination_config.setup_pipeline("refresh_full_test", refresh="drop_data")
pipeline = destination_config.setup_pipeline(
"refresh_full_test", refresh="drop_data", dev_mode=True
)

info = pipeline.run(
refresh_source(first_run=True), write_disposition="append", **destination_config.run_kwargs
Expand Down Expand Up @@ -408,7 +415,9 @@ def source_2_data_2():
yield source_2_data_1
yield source_2_data_2

pipeline = destination_config.setup_pipeline("refresh_full_test", refresh="drop_sources")
pipeline = destination_config.setup_pipeline(
"refresh_full_test", refresh="drop_sources", dev_mode=True
)

# Run both sources
info = pipeline.run(
Expand Down Expand Up @@ -459,7 +468,7 @@ def source_2_data_2():
ids=lambda x: x.name,
)
def test_refresh_argument_to_run(destination_config: DestinationTestConfiguration):
pipeline = destination_config.setup_pipeline("refresh_full_test")
pipeline = destination_config.setup_pipeline("refresh_full_test", dev_mode=True)

info = pipeline.run(refresh_source(first_run=True), **destination_config.run_kwargs)
assert_load_info(info)
Expand Down Expand Up @@ -495,7 +504,7 @@ def test_refresh_argument_to_run(destination_config: DestinationTestConfiguratio
ids=lambda x: x.name,
)
def test_refresh_argument_to_extract(destination_config: DestinationTestConfiguration):
pipeline = destination_config.setup_pipeline("refresh_full_test")
pipeline = destination_config.setup_pipeline("refresh_full_test", dev_mode=True)

info = pipeline.run(refresh_source(first_run=True), **destination_config.run_kwargs)
assert_load_info(info)
Expand Down
Loading
Loading