Skip to content

Commit 9357c6f

Browse files
[Kernel][Kernel UC] Make startBoundary required for CommitRange (#5537)
<!-- Thanks for sending a pull request! Here are some tips for you: 1. If this is your first time, please read our contributor guidelines: https://github.com/delta-io/delta/blob/master/CONTRIBUTING.md 2. If the PR is unfinished, add '[WIP]' in your PR title, e.g., '[WIP] Your PR title ...'. 3. Be sure to keep the PR description updated to reflect all changes. 4. Please write your PR title to summarize what this PR proposes. 5. If possible, provide a concise example to reproduce the issue for a faster review. 6. If applicable, include the corresponding issue number in the PR title and link it in the body. --> #### Which Delta project/connector is this regarding? <!-- Please add the component selected below to the beginning of the pull request title For example: [Spark] Title of my pull request --> - [ ] Spark - [ ] Standalone - [ ] Flink - [X] Kernel - [ ] Other (fill in here) ## Description Currently when building a commit range start boundary is optional and defaults to 0 when not provided. However, there is no good use-case during which the start version is not provided so this PR makes it required. The prior behavior can always be implemented by the caller by providing version=0 start boundary. ## How was this patch tested? Updates/adds tests.
1 parent 417b3e9 commit 9357c6f

File tree

12 files changed

+198
-198
lines changed

12 files changed

+198
-198
lines changed

kernel-spark/src/main/java/io/delta/kernel/spark/snapshot/PathBasedSnapshotManager.java

Lines changed: 2 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -131,8 +131,8 @@ public void checkVersionExists(long version, boolean mustBeRecreatable, boolean
131131
@Override
132132
public CommitRange getTableChanges(Engine engine, long startVersion, Optional<Long> endVersion) {
133133
CommitRangeBuilder builder =
134-
TableManager.loadCommitRange(tablePath)
135-
.withStartBoundary(CommitRangeBuilder.CommitBoundary.atVersion(startVersion));
134+
TableManager.loadCommitRange(
135+
tablePath, CommitRangeBuilder.CommitBoundary.atVersion(startVersion));
136136

137137
if (endVersion.isPresent()) {
138138
builder =

kernel/kernel-api/src/main/java/io/delta/kernel/CommitRange.java

Lines changed: 2 additions & 3 deletions
Original file line numberDiff line numberDiff line change
@@ -59,10 +59,9 @@ public interface CommitRange {
5959
* <p>The boundary indicates whether the range was defined using a specific version number or a
6060
* timestamp.
6161
*
62-
* @return an {@link Optional} containing the start boundary, or empty if the range was created
63-
* with default start parameters (version 0)
62+
* @return the start boundary for this commit range
6463
*/
65-
Optional<CommitRangeBuilder.CommitBoundary> getQueryStartBoundary();
64+
CommitRangeBuilder.CommitBoundary getQueryStartBoundary();
6665

6766
/**
6867
* Returns the original query boundary used to define the end boundary of this commit range, if

kernel/kernel-api/src/main/java/io/delta/kernel/CommitRangeBuilder.java

Lines changed: 3 additions & 13 deletions
Original file line numberDiff line numberDiff line change
@@ -29,25 +29,15 @@
2929
* A builder for creating {@link CommitRange} instances that define a contiguous range of commits in
3030
* a Delta Lake table.
3131
*
32-
* <p>If no start specification is provided, the range defaults to starting at version 0. If no end
33-
* specification is provided, the range defaults to the latest available version.
32+
* <p>The start boundary is required and provided via {@link TableManager#loadCommitRange(String,
33+
* CommitBoundary)}. If no end specification is provided, the range defaults to the latest available
34+
* version.
3435
*
3536
* @since 3.4.0
3637
*/
3738
@Experimental
3839
public interface CommitRangeBuilder {
3940

40-
/**
41-
* Configures the builder to start the commit range at a specific version or timestamp.
42-
*
43-
* <p>If not specified, the commit range will default to starting at version 0.
44-
*
45-
* @param startBoundary the boundary specification for the start of the commit range, must not be
46-
* null
47-
* @return this builder instance configured with the specified start boundary
48-
*/
49-
CommitRangeBuilder withStartBoundary(CommitBoundary startBoundary);
50-
5141
/**
5242
* Configures the builder to end the commit range at a specific version or timestamp.
5343
*

kernel/kernel-api/src/main/java/io/delta/kernel/TableManager.java

Lines changed: 7 additions & 4 deletions
Original file line numberDiff line numberDiff line change
@@ -64,14 +64,17 @@ static CreateTableTransactionBuilder buildCreateTableTransaction(
6464
/**
6565
* Creates a builder for loading a CommitRange at a given path.
6666
*
67-
* <p>The returned builder can be configured with start version or timestamp and an end version or
68-
* timestamp, and with additional metadata to optimize the loading process.
67+
* <p>The returned builder can be configured with an end version or timestamp, and with additional
68+
* metadata to optimize the loading process.
6969
*
7070
* @param path the file system path to the Delta table
71+
* @param startBoundary the boundary specification for the start of the commit range, must not be
72+
* null
7173
* @return a {@link CommitRangeBuilder} that can be used to load a {@link CommitRange} at the
7274
* given path
7375
*/
74-
static CommitRangeBuilder loadCommitRange(String path) {
75-
return new CommitRangeBuilderImpl(path);
76+
static CommitRangeBuilder loadCommitRange(
77+
String path, CommitRangeBuilder.CommitBoundary startBoundary) {
78+
return new CommitRangeBuilderImpl(path, startBoundary);
7679
}
7780
}

kernel/kernel-api/src/main/java/io/delta/kernel/internal/commitrange/CommitRangeBuilderImpl.java

Lines changed: 9 additions & 13 deletions
Original file line numberDiff line numberDiff line change
@@ -38,31 +38,26 @@ public class CommitRangeBuilderImpl implements CommitRangeBuilder {
3838

3939
public static class Context {
4040
public final String unresolvedPath;
41-
public Optional<CommitBoundary> startBoundaryOpt = Optional.empty();
41+
public final CommitBoundary startBoundary;
4242
public Optional<CommitBoundary> endBoundaryOpt = Optional.empty();
4343
public List<ParsedLogData> logDatas = Collections.emptyList();
4444

45-
public Context(String unresolvedPath) {
45+
public Context(String unresolvedPath, CommitBoundary startBoundary) {
4646
this.unresolvedPath = requireNonNull(unresolvedPath, "unresolvedPath is null");
47+
this.startBoundary = requireNonNull(startBoundary, "startBoundary is null");
4748
}
4849
}
4950

5051
private final Context ctx;
5152

52-
public CommitRangeBuilderImpl(String unresolvedPath) {
53-
ctx = new Context(unresolvedPath);
53+
public CommitRangeBuilderImpl(String unresolvedPath, CommitBoundary startBoundary) {
54+
ctx = new Context(unresolvedPath, startBoundary);
5455
}
5556

5657
///////////////////////////////////////
5758
// Public CommitRangeBuilder Methods //
5859
///////////////////////////////////////
5960

60-
@Override
61-
public CommitRangeBuilderImpl withStartBoundary(CommitBoundary startBoundary) {
62-
ctx.startBoundaryOpt = Optional.of(requireNonNull(startBoundary, "startBoundary is null"));
63-
return this;
64-
}
65-
6661
@Override
6762
public CommitRangeBuilderImpl withEndBoundary(CommitBoundary endBoundary) {
6863
ctx.endBoundaryOpt = Optional.of(requireNonNull(endBoundary, "endBoundary is null"));
@@ -86,9 +81,10 @@ public CommitRange build(Engine engine) {
8681
////////////////////////////
8782

8883
private void validateInputOnBuild() {
89-
// Validate that start boundary is less than or equal to end boundary if both are provided
90-
if (ctx.startBoundaryOpt.isPresent() && ctx.endBoundaryOpt.isPresent()) {
91-
CommitBoundary startBoundary = ctx.startBoundaryOpt.get();
84+
// Validate that start boundary is less than or equal to end boundary if end boundary is
85+
// provided
86+
if (ctx.endBoundaryOpt.isPresent()) {
87+
CommitBoundary startBoundary = ctx.startBoundary;
9288
CommitBoundary endBoundary = ctx.endBoundaryOpt.get();
9389

9490
// If both are version-based, compare versions

kernel/kernel-api/src/main/java/io/delta/kernel/internal/commitrange/CommitRangeFactory.java

Lines changed: 7 additions & 13 deletions
Original file line numberDiff line numberDiff line change
@@ -69,28 +69,22 @@ CommitRangeImpl create(Engine engine) {
6969
logger.info("{}: Resolved end-boundary to the latest version {}", tablePath, endVersion);
7070
}
7171
return new CommitRangeImpl(
72-
tablePath, ctx.startBoundaryOpt, ctx.endBoundaryOpt, startVersion, endVersion, deltas);
72+
tablePath, ctx.startBoundary, ctx.endBoundaryOpt, startVersion, endVersion, deltas);
7373
}
7474

7575
private long resolveStartVersion(Engine engine, List<ParsedCatalogCommitData> catalogCommits) {
76-
if (!ctx.startBoundaryOpt.isPresent()) {
77-
// Default to version 0 if no start boundary is provided
78-
return 0L;
79-
}
80-
CommitRangeBuilder.CommitBoundary startBoundary = ctx.startBoundaryOpt.get();
81-
82-
if (startBoundary.isVersion()) {
83-
return startBoundary.getVersion();
76+
if (ctx.startBoundary.isVersion()) {
77+
return ctx.startBoundary.getVersion();
8478
} else {
8579
logger.info(
8680
"{}: Trying to resolve start-boundary timestamp {} to version",
8781
tablePath,
88-
startBoundary.getTimestamp());
82+
ctx.startBoundary.getTimestamp());
8983
return DeltaHistoryManager.getVersionAtOrAfterTimestamp(
9084
engine,
9185
logPath,
92-
startBoundary.getTimestamp(),
93-
(SnapshotImpl) startBoundary.getLatestSnapshot(),
86+
ctx.startBoundary.getTimestamp(),
87+
(SnapshotImpl) ctx.startBoundary.getLatestSnapshot(),
9488
catalogCommits);
9589
}
9690
}
@@ -143,7 +137,7 @@ private void logResolvedVersions(long startVersion, Optional<Long> endVersionOpt
143137
tablePath,
144138
startVersion,
145139
endVersionOpt,
146-
ctx.startBoundaryOpt,
140+
ctx.startBoundary,
147141
ctx.endBoundaryOpt);
148142
}
149143

kernel/kernel-api/src/main/java/io/delta/kernel/internal/commitrange/CommitRangeImpl.java

Lines changed: 5 additions & 5 deletions
Original file line numberDiff line numberDiff line change
@@ -41,7 +41,7 @@
4141
public class CommitRangeImpl implements CommitRange {
4242

4343
private final Path dataPath;
44-
private final Optional<CommitRangeBuilder.CommitBoundary> startBoundaryOpt;
44+
private final CommitRangeBuilder.CommitBoundary startBoundary;
4545
private final Optional<CommitRangeBuilder.CommitBoundary> endBoundaryOpt;
4646

4747
private final long startVersion;
@@ -50,7 +50,7 @@ public class CommitRangeImpl implements CommitRange {
5050

5151
public CommitRangeImpl(
5252
Path dataPath,
53-
Optional<CommitRangeBuilder.CommitBoundary> startBoundaryOpt,
53+
CommitRangeBuilder.CommitBoundary startBoundary,
5454
Optional<CommitRangeBuilder.CommitBoundary> endBoundaryOpt,
5555
long startVersion,
5656
long endVersion,
@@ -59,7 +59,7 @@ public CommitRangeImpl(
5959
checkArgument(
6060
deltas.size() == endVersion - startVersion + 1, "deltaFiles size must match size of range");
6161
this.dataPath = requireNonNull(dataPath, "dataPath cannot be null");
62-
this.startBoundaryOpt = requireNonNull(startBoundaryOpt, "startSpecOpt cannot be null");
62+
this.startBoundary = requireNonNull(startBoundary, "startBoundary cannot be null");
6363
this.endBoundaryOpt = requireNonNull(endBoundaryOpt, "endSpecOpt cannot be null");
6464
this.startVersion = startVersion;
6565
this.endVersion = endVersion;
@@ -81,8 +81,8 @@ public long getEndVersion() {
8181
}
8282

8383
@Override
84-
public Optional<CommitRangeBuilder.CommitBoundary> getQueryStartBoundary() {
85-
return startBoundaryOpt;
84+
public CommitRangeBuilder.CommitBoundary getQueryStartBoundary() {
85+
return startBoundary;
8686
}
8787

8888
@Override

0 commit comments

Comments
 (0)