Skip to content

to_iceberg: conditional merge #3173

@thnd23

Description

@thnd23

Is your feature request related to a problem? Please describe.
to_iceberg method does not allow for conditional merge. This is very desired, otherwise following arguments:

    merge_cols: list[str] | None = None,
    merge_condition: Literal["update", "ignore"] = "update",

will not be able to handle non-chronological data and can overwrite more recent records.

Describe the solution you'd like
Introduce one additional merge_condition literal "conditional_merge" and one optional argument conditional_merge_string.

Extend following segment of code:

    if merge_cols:
        if merge_condition == "update":
            match_condition = f"""WHEN MATCHED THEN
                UPDATE SET {", ".join([f'"{x}" = source."{x}"' for x in df.columns])}"""
        else:
            match_condition = ""

with one elif statement:

        elif merge_condition == "conditional_merge":
            match_condition = f"""WHEN MATCHED AND {conditional_merge_string} THEN
                UPDATE SET {", ".join([f'"{x}" = source."{x}"' for x in df.columns])}"""

Describe alternatives you've considered
Writing Athena queries directly and bypassing entire _write_iceberg.py implementation.

Metadata

Metadata

Assignees

No one assigned

    Labels

    Type

    No type

    Projects

    No projects

    Milestone

    No milestone

    Relationships

    None yet

    Development

    No branches or pull requests

    Issue actions