Skip to content

Conversation

@mxm
Copy link
Contributor

@mxm mxm commented Aug 21, 2025

This feature allows modifying table properties on the fly. Users have reported that they need to update table properties of both new and existing tables. Some catalogues even check for certain properties and reject table changes if those properties are not present.

Below the doc added as part of this PR:


Dynamic Table Properties

The Dynamic Sink supports dynamically updating table properties. This feature allows you to:

  • Set different properties for different tables based on table names
  • Apply properties during table creation
  • Update properties for existing tables

TablePropertiesUpdater Interface

The TablePropertiesUpdater is an interface that receives the fully-qualified table name and current properties, then returns the updated properties:

interface TablePropertiesUpdater extends Serializable {
    Map<String, String> apply(String tableName, Map<String, String> currentProperties);
}

Usage Example

TablePropertiesUpdater updater = (tableName, currentProps) -> {
    Map<String, String> updatedProps = new HashMap<>(currentProps);

    // Set compression based on table name
    if (tableName.contains("logs")) {
        updatedProps.put("write.parquet.compression-codec", "gzip");
    } else {
        updatedProps.put("write.parquet.compression-codec", "snappy");
    }

    // Set format properties
    updatedProps.put("write.format.default", "parquet");

    // Add custom metadata
    updatedProps.put("created.by", "dynamic-sink");
    updatedProps.put("table.identifier", tableName);

    // Remove table properties
    updatedProps.remove("to.be.removed.prop");

    return updatedProps;
};

DynamicIcebergSink.forInput(dataStream)
    .generator(new CustomRecordGenerator())
    .catalogLoader(catalogLoader)
    .tablePropertiesUpdater(updater)
    .append();

try {
UpdateProperties updateApi = table.updateProperties();

// Remove properties that are no longer present
Copy link
Contributor

@swapna267 swapna267 Aug 21, 2025

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

So we consider Flink job as source of truth for properties and remove properties on tables , if added from external pipelines ?

In large scale ingestion pipelines, if we need to add a new property, we will need to then update the TablePropertiesUpdaterImpl and redeploy right ? This may not be a feasible as it may also put too much pressure on catalog on startup.

But that also means, there is no one source of truth :)

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

If this feature is enabled, which is optional, the job is the source of truth.

In large scale ingestion pipelines, if we need to add a new property, we will need to then update the TablePropertiesUpdaterImpl and redeploy right ? This may not be a feasible as it may also put too much pressure on catalog on startup.

This is no different from other operations (e.g. table creation, schema changes) that the sink may perform.

Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Was thinking of, updating table properties across all tables (TablePropertiesUpdater code update) is more common than schema updates to all tables at same time on startup.

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I'm not 100% sure though how the "needsUpdate()" method will make a difference. If the user returns the same properties, we will already skip the update. Concerning the time of the update, I think properties will have to be updated on first seeing the table (that is, if there are changes to the properties). Not sure if there is a way around it.

}
}

private void updateTablePropertiesIfNeeded(TableIdentifier identifier) {
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

This is basically run for every DynamicRecord right ?
Can we avoid this check for every DynamicRecord , where users have TablePropertiesUpdater to not vary/depend on currentProperties of the table ?

Like i am thinking if possible to extend the TablePropertiesUpdater interface to also have needsRefresh() that can default to true. But users can have less expensive checks (ex: false for first record of a table in a task).

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

This check isn't that expensive:

  1. Table properties are cached
  2. There aren't any calls to the UpdateProperties API if properties for the table did not change
  3. Also, if there is no handler specified, this check will be skipped entirely (feature is disabled then)

Can we avoid this check for every DynamicRecord , where users have TablePropertiesUpdater to not vary/depend on currentProperties of the table ?

We can certainly do that.

import java.util.Map;

@FunctionalInterface
public interface TablePropertiesUpdater extends Serializable {
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Could you please elaborate why we need an updater interface for the properties?
Why not just an expected Map<String, String> for the values? The DynamicRecord could contain the expected properties map for the table, and we can act upon that

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I was contemplating to add the properties to DynamicRecord, but I decided against it because table properties aren't directly connected to the data.

Perhaps there are other table-related settings like the table location which users want to control (this has already been requested by users). The location would only be settable during table creation. Using a separate interface we would be better able to express this intent.

Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

As per our discussion the next request will be to change the location of the table on table creation.
Maybe we should delegate the table creation to the user and they can manipulate whatever they want.

I'm a little bit more conflicted on the update. Is it really something we have use-case for? Why would we like to update some properties of an already existing table? This seems like a different problem than writing into a table. Maybe if this is a requirement they can do it concurrently in a parallel flow? The properties should not affect the writing, so parallel write could continue even if the properties need to be changed

// Apply table properties during table creation if updater is provided
Map<String, String> properties = Maps.newHashMap();
if (tablePropertiesUpdater != null) {
properties = tablePropertiesUpdater.apply(identifier.toString(), properties);

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Instead of passing in identifier.toString() as the tableName in TablePropertiesUpdater, could we just modify TablePropertiesUpdater to accept a TableIdentifier? This may be a bit more accurate, since identifier.toString() is composed of both the namespace as well as the tableName. Plus, users may want to update properties such as write.data.path based on both namespace and/or tableName.

@github-actions
Copy link

This pull request has been marked as stale due to 30 days of inactivity. It will be closed in 1 week if no further activity occurs. If you think that’s incorrect or this pull request requires a review, please simply write any comment. If closed, you can revive the PR at any time and @mention a reviewer or discuss it on the [email protected] list. Thank you for your contributions.

@github-actions github-actions bot added the stale label Oct 17, 2025
@github-actions
Copy link

This pull request has been closed due to lack of activity. This is not a judgement on the merit of the PR in any way. It is just a way of keeping the PR queue manageable. If you think that is incorrect, or the pull request requires review, you can revive the PR at any time.

Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment

Projects

None yet

Development

Successfully merging this pull request may close these issues.

4 participants