Skip to content
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
Original file line number Diff line number Diff line change
Expand Up @@ -192,7 +192,7 @@ public InMemoryUCClient createUCClient(Engine engine, String tableRoot) throws I
InMemoryUCClient ucClient = new InMemoryUCClient("benchmark-metastore");
InMemoryUCClient.TableData tableData =
new InMemoryUCClient.TableData(maxRatifiedVersion, commits);
ucClient.createTableIfNotExistsOrThrow(ucTableId, tableData);
ucClient.insertTableData(ucTableId, tableData);

return ucClient;
}
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -124,10 +124,12 @@ public Snapshot loadSnapshot(

metricsCollector.setNumCatalogCommits(response.getCommits().size());

final long ucTableVersion =
getTrueUCTableVersion(ucTableId, response.getLatestTableVersion());
final long maxUcTableVersion = response.getLatestTableVersion();

versionOpt.ifPresent(
version -> validateLoadTableVersionExists(ucTableId, version, ucTableVersion));
version ->
validateTimeTravelVersionNotPastMax(ucTableId, version, maxUcTableVersion));

final List<ParsedLogData> logData =
getSortedKernelParsedDeltaDataFromRatifiedCommits(
ucTableId, response.getCommits());
Expand All @@ -149,7 +151,11 @@ public Snapshot loadSnapshot(
.timeChecked(
() ->
loadLatestSnapshotForTimestampResolution(
engine, ucTableId, tablePath, logData, ucTableVersion));
engine,
ucTableId,
tablePath,
logData,
maxUcTableVersion));
snapshotBuilder =
snapshotBuilder.atTimestamp(timestampOpt.get(), latestSnapshot);
}
Expand All @@ -158,7 +164,7 @@ public Snapshot loadSnapshot(
snapshotBuilder
.withCommitter(createUCCommitter(ucClient, ucTableId, tablePath))
.withLogData(logData)
.withMaxCatalogVersion(ucTableVersion)
.withMaxCatalogVersion(maxUcTableVersion)
.build(engine);
metricsCollector.setResolvedSnapshotVersion(snapshot.getVersion());
return snapshot;
Expand Down Expand Up @@ -267,7 +273,7 @@ public CommitRange loadCommitRange(
endVersionOpt.filter(v -> !startTimestampOpt.isPresent());
final GetCommitsResponse response =
getRatifiedCommitsFromUC(ucTableId, tablePath, endVersionOptForCommitQuery);
final long ucTableVersion = getTrueUCTableVersion(ucTableId, response.getLatestTableVersion());
final long ucTableVersion = response.getLatestTableVersion();
validateVersionBoundariesExist(ucTableId, startVersionOpt, endVersionOpt, ucTableVersion);
final List<ParsedLogData> logData =
getSortedKernelParsedDeltaDataFromRatifiedCommits(ucTableId, response.getCommits());
Expand Down Expand Up @@ -404,22 +410,7 @@ private GetCommitsResponse getRatifiedCommitsFromUC(
return response;
}

// TODO: [delta-io/delta#5118] If UC changes CREATE semantics, update logic here.
/**
* As of this writing, UC catalog service is not informed when 0.json is successfully written
* during table creation. Thus, when 0.json exists, the max ratified version returned by UC is -1.
*/
private long getTrueUCTableVersion(String ucTableId, long maxRatifiedVersion) {
Copy link
Collaborator Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

This is the only real change here. Everything else is just test refactors. Thought I'd bundle them all together since it's a rather small PR

if (maxRatifiedVersion == -1) {
logger.info(
"[{}] UC max ratified version is -1. This means 0.json exists. Version is 0.", ucTableId);
return 0;
}

return maxRatifiedVersion;
}

private void validateLoadTableVersionExists(
private void validateTimeTravelVersionNotPastMax(
String ucTableId, long tableVersionToLoad, long maxRatifiedVersion) {
if (tableVersionToLoad > maxRatifiedVersion) {
throw new IllegalArgumentException(
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -38,14 +38,14 @@ object InMemoryUCClient {
* concurrently.
*/
class TableData(
private var maxRatifiedVersion: Long = -1L,
private val commits: ArrayBuffer[Commit] = ArrayBuffer.empty) {
private var maxRatifiedVersion: Long,
private val commits: ArrayBuffer[Commit]) {

// For test only, since UC doesn't store these as top-level entities.
private var currentProtocolOpt: Option[AbstractProtocol] = None
private var currentMetadataOpt: Option[AbstractMetadata] = None

/** @return the maximum ratified version, or -1 if no commits have been made. */
/** @return the maximum ratified version. */
def getMaxRatifiedVersion: Long = synchronized { maxRatifiedVersion }

/** @return An immutable list of all commits. */
Expand Down Expand Up @@ -74,9 +74,7 @@ object InMemoryUCClient {
commit: Commit,
newProtocol: Optional[AbstractProtocol] = Optional.empty(),
newMetadata: Optional[AbstractMetadata] = Optional.empty()): Unit = synchronized {
// TODO: [delta-io/delta#5118] If UC changes CREATE semantics, update logic here.
// For UC, commit 0 is expected to go through the filesystem
val expectedCommitVersion = if (maxRatifiedVersion == -1L) 1 else maxRatifiedVersion + 1
val expectedCommitVersion = maxRatifiedVersion + 1

if (commit.getVersion != expectedCommitVersion) {
throw new CommitFailedException(
Expand All @@ -103,6 +101,10 @@ object InMemoryUCClient {
}
}
}

object TableData {
def afterCreate(): TableData = new TableData(0, ArrayBuffer.empty[Commit])
}
}

/**
Expand Down Expand Up @@ -186,9 +188,12 @@ class InMemoryUCClient(ucMetastoreId: String) extends UCClient {
/** Visible for testing. Can be overridden to force an exception in commit method. */
protected def forceThrowInCommitMethod(): Unit = {}

private[unitycatalog] def createTableIfNotExistsOrThrow(
ucTableId: String,
tableData: TableData): Unit = {
private[unitycatalog] def insertTableDataAfterCreate(ucTableId: String): Unit = {
Option(tables.putIfAbsent(ucTableId, TableData.afterCreate()))
.foreach(_ => throw new IllegalArgumentException(s"Table $ucTableId already exists"))
}

private[unitycatalog] def insertTableData(ucTableId: String, tableData: TableData): Unit = {
Option(tables.putIfAbsent(ucTableId, tableData))
.foreach(_ => throw new IllegalArgumentException(s"Table $ucTableId already exists"))
}
Expand All @@ -205,6 +210,6 @@ class InMemoryUCClient(ucMetastoreId: String) extends UCClient {

/** Retrieves the table data for the given table ID, creating it if it does not exist. */
private def getOrCreateTableIfNotExists(tableId: String): TableData = {
tables.computeIfAbsent(tableId, _ => new TableData)
tables.computeIfAbsent(tableId, _ => TableData.afterCreate())
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -21,8 +21,9 @@ import java.net.URI
import java.util.Optional

import scala.collection.JavaConverters._
import scala.collection.mutable.ArrayBuffer

import io.delta.storage.commit.CommitFailedException
import io.delta.storage.commit.{Commit, CommitFailedException}
import io.delta.storage.commit.uccommitcoordinator.InvalidTargetTableException

import org.scalatest.funsuite.AnyFunSuite
Expand All @@ -42,21 +43,9 @@ class InMemoryUCClientSuite extends AnyFunSuite with UCCatalogManagedTestUtils {
assert(actualVersions == expectedVersions)
}

// TODO: [delta-io/delta#5118] If UC changes CREATE semantics, update logic here.
test("TableData::appendCommit throws on commit v0 (since CREATE does not go through UC)") {
val tableData = new InMemoryUCClient.TableData

val exMsg = intercept[CommitFailedException] {
tableData.appendCommit(createCommit(0L))
}.getMessage

assert(exMsg.contains("Expected commit version 1 but got 0"))
}

// TODO: [delta-io/delta#5118] If UC changes CREATE semantics, update logic here.
test("TableData::appendCommit handles commit version 1 (since CREATE does not go through UC)") {
val tableData = new InMemoryUCClient.TableData
assert(tableData.getMaxRatifiedVersion == -1L)
val tableData = InMemoryUCClient.TableData.afterCreate()
assert(tableData.getMaxRatifiedVersion == 0L)

tableData.appendCommit(createCommit(1L))

Expand All @@ -65,9 +54,8 @@ class InMemoryUCClientSuite extends AnyFunSuite with UCCatalogManagedTestUtils {
assert(tableData.getCommits.head.getVersion == 1L)
}

test("TableData::appendCommit throws if commit version is not maxRatifiedVersion + 1 " +
"(excluding v1 edge case)") {
val tableData = new InMemoryUCClient.TableData
test("TableData::appendCommit throws if commit version is not maxRatifiedVersion + 1") {
val tableData = InMemoryUCClient.TableData.afterCreate()
tableData.appendCommit(createCommit(1L))

val exMsg = intercept[CommitFailedException] {
Expand All @@ -78,7 +66,7 @@ class InMemoryUCClientSuite extends AnyFunSuite with UCCatalogManagedTestUtils {
}

test("TableData::appendCommit appends the commit and updates the maxRatifiedVersion") {
val tableData = new InMemoryUCClient.TableData
val tableData = InMemoryUCClient.TableData.afterCreate()
tableData.appendCommit(createCommit(1L))

assert(tableData.getMaxRatifiedVersion == 1L)
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -265,13 +265,10 @@ class UCCatalogManagedClientCommitRangeSuite extends AnyFunSuite with UCCatalogM
assert(ex.getCause.isInstanceOf[InvalidTargetTableException])
}

test("loadCommitRange for new table when UC maxRatifiedVersion is -1") {
test("loadCommitRange for new table when UC maxRatifiedVersion is 0") {
val tablePath = getTestResourceFilePath("catalog-owned-preview")
val ucCatalogManagedClient =
createUCCatalogManagedClientForTableWithMaxRatifiedVersionNegativeOne()
val commitRange = loadCommitRange(
ucCatalogManagedClient,
tablePath = tablePath)
val ucCatalogManagedClient = createUCCatalogManagedClientForTableAfterCreate()
val commitRange = loadCommitRange(ucCatalogManagedClient, tablePath = tablePath)

assert(commitRange.getStartVersion == 0)
assert(commitRange.getEndVersion == 0)
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -134,10 +134,9 @@ class UCCatalogManagedClientSuite extends AnyFunSuite with UCCatalogManagedTestU
(javaLongOpt(0L), emptyLongOpt, "v0 (explicitly by version)"),
(emptyLongOpt, javaLongOpt(1749830855993L), "v0 (explicitly by timestamp")).foreach {
case (versionToLoad, timestampToLoad, description) =>
test(s"table version 0 is loaded when UC maxRatifiedVersion is -1 -- $description") {
test(s"table version 0 is loaded when UC maxRatifiedVersion is 0 -- $description") {
val tablePath = getTestResourceFilePath("catalog-owned-preview")
val ucCatalogManagedClient =
createUCCatalogManagedClientForTableWithMaxRatifiedVersionNegativeOne()
val ucCatalogManagedClient = createUCCatalogManagedClientForTableAfterCreate()
val snapshot = loadSnapshot(
ucCatalogManagedClient,
tablePath = tablePath,
Expand Down Expand Up @@ -263,8 +262,7 @@ class UCCatalogManagedClientSuite extends AnyFunSuite with UCCatalogManagedTestU

test("creates snapshot with UCCatalogManagedCommitter") {
val tablePath = getTestResourceFilePath("catalog-owned-preview")
val ucCatalogManagedClient =
createUCCatalogManagedClientForTableWithMaxRatifiedVersionNegativeOne()
val ucCatalogManagedClient = createUCCatalogManagedClientForTableAfterCreate()
val snapshot =
loadSnapshot(ucCatalogManagedClient, tablePath = tablePath, versionToLoad = Optional.of(0L))
assert(snapshot.getCommitter.isInstanceOf[UCCatalogManagedCommitter])
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -210,8 +210,7 @@ class UCCatalogManagedCommitterSuite
withTempDirAndAllDeltaSubDirs { case (tablePath, logPath) =>
// ===== GIVEN =====
val ucClient = new InMemoryUCClient("ucMetastoreId")
ucClient
.createTableIfNotExistsOrThrow(testUcTableId, new TableData(-1, ArrayBuffer[Commit]()))
ucClient.insertTableDataAfterCreate(testUcTableId)
val committer = new UCCatalogManagedCommitter(ucClient, testUcTableId, tablePath)

// ===== WHEN =====
Expand Down Expand Up @@ -244,11 +243,8 @@ class UCCatalogManagedCommitterSuite
test("CATALOG_WRITE: writes staged commit file and invokes UC client commit API (no P&M change") {
withTempDirAndAllDeltaSubDirs { case (tablePath, logPath) =>
// ===== GIVEN =====
// Set up UC client with initial table with maxRatifiedVersion = -1, numCommits = 0. This
// represents a table that was just created and at version 0. We will then commit version 1.
val ucClient = new InMemoryUCClient("ucMetastoreId")
val tableData = new TableData(-1, ArrayBuffer[Commit]())
ucClient.createTableIfNotExistsOrThrow(testUcTableId, tableData)
ucClient.insertTableDataAfterCreate(testUcTableId)

val testValue = "TEST_COMMIT_DATA_12345"
val actionsIterator = getSingleElementRowIter(testValue)
Expand Down Expand Up @@ -304,7 +300,7 @@ class UCCatalogManagedCommitterSuite

val ucClient = new InMemoryUCClient("ucMetastoreId")
val tableData = new TableData(maxRatifiedVersion = 1, commits = ArrayBuffer.empty[Commit])
ucClient.createTableIfNotExistsOrThrow(testUcTableId, tableData)
ucClient.insertTableData(testUcTableId, tableData)
val committer = new UCCatalogManagedCommitter(ucClient, testUcTableId, tablePath)
val commitMetadata = catalogManagedWriteCommitMetadata(2, logPath = logPath)

Expand All @@ -331,7 +327,7 @@ class UCCatalogManagedCommitterSuite
null)
}
val tableData = new TableData(maxRatifiedVersion = 1, commits = ArrayBuffer.empty[Commit])
ucClient.createTableIfNotExistsOrThrow(testUcTableId, tableData)
ucClient.insertTableData(testUcTableId, tableData)
val committer = new UCCatalogManagedCommitter(ucClient, testUcTableId, tablePath)
val commitMetadata = catalogManagedWriteCommitMetadata(2, logPath = logPath)
// ===== WHEN =====
Expand All @@ -352,7 +348,7 @@ class UCCatalogManagedCommitterSuite
override def forceThrowInCommitMethod(): Unit = throw new IOException("UC network error")
}
val tableData = new TableData(maxRatifiedVersion = 1, commits = ArrayBuffer.empty[Commit])
ucClient.createTableIfNotExistsOrThrow(testUcTableId, tableData)
ucClient.insertTableData(testUcTableId, tableData)
val committer = new UCCatalogManagedCommitter(ucClient, testUcTableId, tablePath)
val commitMetadata = catalogManagedWriteCommitMetadata(2, logPath = logPath)

Expand All @@ -377,7 +373,7 @@ class UCCatalogManagedCommitterSuite
}
}
val tableData = new TableData(maxRatifiedVersion = 1, commits = ArrayBuffer.empty[Commit])
ucClient.createTableIfNotExistsOrThrow(testUcTableId, tableData)
ucClient.insertTableData(testUcTableId, tableData)
val committer = new UCCatalogManagedCommitter(ucClient, "unknownTableId", tablePath)
val commitMetadata = catalogManagedWriteCommitMetadata(2, logPath = logPath)

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -152,16 +152,6 @@ trait UCCatalogManagedTestUtils
(ucClient, ucCatalogManagedClient)
}

/**
* Initializes a UC table in the InMemoryUCClient after creation.
* This should be called after creating a table with buildCreateTableTransaction.
*/
def initializeUCTable(ucClient: InMemoryUCClient, ucTableId: String): Unit = {
val tableData =
new InMemoryUCClient.TableData(-1, scala.collection.mutable.ArrayBuffer[Commit]())
ucClient.createTableIfNotExistsOrThrow(ucTableId, tableData)
}

/** Version TS for the test table used in [[withUCClientAndTestTable]] */
val v0Ts = 1749830855993L // published commit
val v1Ts = 1749830871085L // ratified staged commit
Expand Down Expand Up @@ -189,20 +179,14 @@ trait UCCatalogManagedTestUtils
fileStatus.getModificationTime)
}
val tableData = new TableData(maxRatifiedVersion, ArrayBuffer(catalogCommits: _*))
ucClient.createTableIfNotExistsOrThrow("testUcTableId", tableData)
ucClient.insertTableData("testUcTableId", tableData)
textFx(ucClient, tablePath, maxRatifiedVersion)
}

// TODO: [delta-io/delta#5118] If UC changes CREATE semantics, update logic here.
/**
* When a new UC table is created, it will have Delta version 0 but the max ratified verison in
* UC is -1. This is a special edge case.
*/
def createUCCatalogManagedClientForTableWithMaxRatifiedVersionNegativeOne(
def createUCCatalogManagedClientForTableAfterCreate(
ucTableId: String = "testUcTableId"): UCCatalogManagedClient = {
val ucClient = new InMemoryUCClient("ucMetastoreId")
val tableData = new TableData(-1, ArrayBuffer[Commit]())
ucClient.createTableIfNotExistsOrThrow(ucTableId, tableData)
ucClient.insertTableDataAfterCreate(ucTableId)
new UCCatalogManagedClient(ucClient)
}

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -64,8 +64,7 @@ class UCE2ESuite extends AnyFunSuite with UCCatalogManagedTestUtils {
.buildCreateTableTransaction(testUcTableId, tablePath, testSchema, "test-engine")
.build(engine)
.commit(engine, CloseableIterable.emptyIterable() /* dataActions */ )
val tableData0 = new TableData(-1, ArrayBuffer[Commit]())
ucClient.createTableIfNotExistsOrThrow(testUcTableId, tableData0)
ucClient.insertTableDataAfterCreate(testUcTableId)
result0.getPostCommitSnapshot.get().publish(engine) // Should be no-op!

// Step 2: WRITE -- v1.uuid.json
Expand Down Expand Up @@ -131,8 +130,7 @@ class UCE2ESuite extends AnyFunSuite with UCCatalogManagedTestUtils {
.buildCreateTableTransaction(testUcTableId, tablePath, testSchema, "test-engine")
.build(engine)
.commit(engine, CloseableIterable.emptyIterable())
val tableData0 = new TableData(-1, ArrayBuffer[Commit]())
ucClient.createTableIfNotExistsOrThrow(testUcTableId, tableData0)
ucClient.insertTableDataAfterCreate(testUcTableId)

var currentSnapshot = result0.getPostCommitSnapshot.get()

Expand Down Expand Up @@ -198,8 +196,7 @@ class UCE2ESuite extends AnyFunSuite with UCCatalogManagedTestUtils {
.buildCreateTableTransaction(testUcTableId, tablePath, testSchema, "test-engine")
.build(engine)
.commit(engine, CloseableIterable.emptyIterable())
val tableData0 = new TableData(-1, ArrayBuffer[Commit]())
ucClient.createTableIfNotExistsOrThrow(testUcTableId, tableData0)
ucClient.insertTableDataAfterCreate(testUcTableId)

// Step 2: WRITE and commit data up to version 2
val postCommitSnapshot1 = writeDataAndVerify(
Expand Down
Loading
Loading