-
Notifications
You must be signed in to change notification settings - Fork 2.9k
Core, Spark: add snapshot properties for snapshot created by wap branches #13922
New issue
Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.
By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.
Already on GitHub? Sign in to your account
Core, Spark: add snapshot properties for snapshot created by wap branches #13922
Conversation
add snapshot properties for wap created snapshots
| assertThat( | ||
| table | ||
| .snapshot(table.refs().get(BRANCH).snapshotId()) | ||
| .summary() | ||
| .get(SnapshotSummary.WAP_BRANCH_PROP)) | ||
| .isEqualTo(BRANCH); |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
We should use AbstractMapAssert's methods for better failure messages:
assertThat(table.snapshot(table.refs().get(BRANCH).snapshotId()).summary())
.containsEntry(SnapshotSummary.WAP_BRANCH_PROP, BRANCH);| assertThat( | ||
| table | ||
| .snapshot(table.refs().get("main").snapshotId()) | ||
| .summary() | ||
| .get(SnapshotSummary.WAP_BRANCH_PROP)) | ||
| .isNull(); |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
We should use AbstractMapAssert's methods for better failure messages:
assertThat(table.snapshot(table.refs().get("main").snapshotId()).summary())
.doesNotContainKey(SnapshotSummary.WAP_BRANCH_PROP);| public static final String REPLACED_MANIFESTS_COUNT = "manifests-replaced"; | ||
| public static final String KEPT_MANIFESTS_COUNT = "manifests-kept"; | ||
| public static final String PROCESSED_MANIFEST_ENTRY_COUNT = "entries-processed"; | ||
| public static final String WAP_BRANCH_PROP = "wap.branch"; |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
How about moving this constant under STAGED_WAP_ID_PROP?
spark/v4.0/spark/src/main/java/org/apache/iceberg/spark/source/SparkPositionDeltaWrite.java
Outdated
Show resolved
Hide resolved
spark/v4.0/spark/src/main/java/org/apache/iceberg/spark/source/SparkWrite.java
Outdated
Show resolved
Hide resolved
|
Thanks for the comments @ebyhr @huaxingao I have addressed them and let me know what you think! |
| assertThatCode( | ||
| () -> { | ||
| sql("INSERT INTO %s VALUES (4, 'd')", tableName); | ||
| Table table = validationCatalog.loadTable(tableIdent); | ||
| assertThat(table.snapshot(table.refs().get("main").snapshotId()).summary()) | ||
| .doesNotContainKey(SnapshotSummary.WAP_BRANCH_PROP); | ||
| }) | ||
| .doesNotThrowAnyException(); |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Why do we need the outer assertThatCode()...doesNotThrowAnyException() method?
Same for TestPartitionedWritesToWapBranch.
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
yeah, not really needed. Remove the outer assertion.
|
@yingjianwu98 Could you add coverage for the SparkPositionDeltaWrite code path? |
|
@RussellSpitzer Could you please take a look at this PR too when you have a moment? Thanks! |
| if (wapEnabled) { | ||
| if (wapId != null) { | ||
| operation.set(SnapshotSummary.STAGED_WAP_ID_PROP, wapId); | ||
| operation.stageOnly(); |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Setting this to stageOnly sounds like it's outside the concept of setting properties to me
| .parse(); | ||
| } | ||
|
|
||
| private String getWapBranch() { |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
nit: We don't use get in our methods for getters
| // write-audit-publish is enabled for this table and job | ||
| // stage the changes without changing the current snapshot | ||
| operation.set(SnapshotSummary.STAGED_WAP_ID_PROP, wapId); | ||
| operation.stageOnly(); |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
I mentioned this above, but i'm not comfortable extracting stageOnly here. That seems a bit different than "setWapProperties". Perhaps a rename may make this a bit more clear, but currently it feels like we are hiding an important state change on operation.
|
I think my big comment here is I have no problem with introducing the wap.branch property, but I don't think we should couple the "staging" logic in the function which sets that property. I think if we isolate the change to just setting either wap.id or branch, we are all set. -- Note we may also add to the Spec index |
| boolean wapEnabled, | ||
| String wapId, | ||
| String branch, | ||
| Predicate<String> isWapBranch) { |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Why do we need a functional interface here?
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
I initially want to pass the SparkWriteConf but it will introduce an extra spark dependency in the util module.
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Hmmm let's try to think of an alternative here? Maybe we just only pass through "branch" if it is a Wap branch?
So have wapId and wapBranch as parameters
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Sure sounds good, and I also agree to separate out the stageOnly commit into a separate function. And the function only responsibility will just be setting the snapshot properties.
| return confParser.stringConf().sessionConf(SparkSQLProperties.WAP_BRANCH).parseOptional(); | ||
| } | ||
|
|
||
| public boolean isWapBranch(String branchName) { |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
I'm not sure why we need this method? Aren't we just always returning true of the wapBranch is not null and matches the branch being written too?
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
ah, I am thinking just wap branches will have this snapshot properties but for other branches, we won't set the property.
|
Thanks everyone for the review again. I have updated the PR based on suggestions and refactor the code. |
|
This pull request has been marked as stale due to 30 days of inactivity. It will be closed in 1 week if no further activity occurs. If you think that’s incorrect or this pull request requires a review, please simply write any comment. If closed, you can revive the PR at any time and @mention a reviewer or discuss it on the [email protected] list. Thank you for your contributions. |
|
This pull request has been closed due to lack of activity. This is not a judgement on the merit of the PR in any way. It is just a way of keeping the PR queue manageable. If you think that is incorrect, or the pull request requires review, you can revive the PR at any time. |
As we transition to using WAP with branches, we’ve noticed a gap in debugging observability compared to the previous approach using wap.id. Specifically, it’s difficult to determine which snapshots were created by WAP branches.
This PR addresses the issue by setting snapshot properties on snapshots that are created by WAP branches, making it easier to trace their origin and improve overall observability.