Skip to content
Draft
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
1 change: 1 addition & 0 deletions build.sbt
Original file line number Diff line number Diff line change
Expand Up @@ -462,6 +462,7 @@ lazy val sparkV1Filtered = (project in file("spark-v1-filtered"))
lazy val sparkV2 = (project in file("kernel-spark"))
.dependsOn(sparkV1Filtered)
.dependsOn(kernelDefaults)
.dependsOn(kernelUnityCatalog)
.dependsOn(goldenTables % "test")
.settings(
name := "delta-spark-v2",
Expand Down
Original file line number Diff line number Diff line change
@@ -0,0 +1,71 @@
/*
* Copyright (2021) The Delta Lake Project Authors.
*
* Licensed under the Apache License, Version 2.0 (the "License");
* you may not use this file except in compliance with the License.
* You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/

package io.delta.kernel.spark.catalog;

import io.delta.kernel.Snapshot;
import io.delta.kernel.engine.Engine;
import java.util.Optional;

/**
* Generic interface for catalog-managed commit coordination.
*
* <p>This interface abstracts catalog-specific implementations (Unity Catalog, AWS Glue, Apache
* Polaris) and provides a uniform API for loading snapshots and checking versions.
*
* <p>Implementations are provided by catalog modules and discovered through Spark's catalog system
* via the CatalogWithManagedCommits trait.
*/
public interface ManagedCommitClient extends AutoCloseable {

/**
* Loads a snapshot of the table.
*
* @param engine Delta Kernel engine for reading Parquet/JSON
* @param tableId catalog-specific table identifier
* @param tablePath filesystem path to table data
* @param version specific version to load (empty = latest)
* @param timestampMillis timestamp for time travel (empty = not used)
* @return snapshot at specified version/timestamp
*/
Snapshot getSnapshot(
Engine engine,
String tableId,
String tablePath,
Optional<Long> version,
Optional<Long> timestampMillis);

/**
* Checks if a specific version exists and is accessible.
*
* @param tableId catalog-specific table identifier
* @param version version to check
* @return true if version exists, false otherwise
*/
boolean versionExists(String tableId, long version);

/**
* Gets the latest version number available.
*
* @param tableId catalog-specific table identifier
* @return latest version number
*/
long getLatestVersion(String tableId);

/** Releases catalog client resources. */
@Override
void close() throws Exception;
}
Original file line number Diff line number Diff line change
@@ -0,0 +1,100 @@
/*
* Copyright (2025) The Delta Lake Project Authors.
*
* Licensed under the Apache License, Version 2.0 (the "License");
* you may not use this file except in compliance with the License.
* You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/

package io.delta.kernel.spark.catalog;

import static java.util.Objects.requireNonNull;

import io.delta.kernel.Snapshot;
import io.delta.kernel.engine.Engine;
import io.delta.kernel.unitycatalog.UCCatalogManagedClient;
import io.delta.storage.commit.GetCommitsResponse;
import io.delta.storage.commit.uccommitcoordinator.UCClient;
import io.delta.storage.commit.uccommitcoordinator.UCCommitCoordinatorException;
import java.io.IOException;
import java.util.Optional;
import org.apache.hadoop.fs.Path;

/**
* Adapter that wraps Unity Catalog's UCCatalogManagedClient to implement the generic
* ManagedCommitClient interface.
*
* <p>This adapter isolates all UC-specific code in the UC module, keeping kernel-spark
* catalog-agnostic.
*/
public class UCManagedCommitClientAdapter implements ManagedCommitClient {

private final UCCatalogManagedClient ucClient;
private final UCClient rawUCClient;
private final String tablePath;

public UCManagedCommitClientAdapter(
UCCatalogManagedClient ucClient, UCClient rawUCClient, String tablePath) {
this.ucClient = requireNonNull(ucClient, "ucClient is null");
this.rawUCClient = requireNonNull(rawUCClient, "rawUCClient is null");
this.tablePath = requireNonNull(tablePath, "tablePath is null");
}

@Override
public Snapshot getSnapshot(
Engine engine,
String tableId,
String tablePath,
Optional<Long> version,
Optional<Long> timestampMillis) {

// Delegate to UC-specific implementation
return ucClient.loadSnapshot(engine, tableId, tablePath, version, timestampMillis);
}

@Override
public boolean versionExists(String tableId, long version) {
try {
// Try to get commits for this specific version
// If the version doesn't exist, UC will return an empty response or throw an exception
GetCommitsResponse response =
rawUCClient.getCommits(
tableId, new Path(tablePath).toUri(), Optional.of(version), Optional.of(version));

// If we get a response with commits, or if latestTableVersion >= version, it exists
return !response.getCommits().isEmpty() || response.getLatestTableVersion() >= version;
} catch (IOException | UCCommitCoordinatorException e) {
// If there's an error, assume the version doesn't exist
return false;
}
}

@Override
public long getLatestVersion(String tableId) {
try {
// Get commits without specifying an end version to get the latest table version
GetCommitsResponse response =
rawUCClient.getCommits(
tableId, new Path(tablePath).toUri(), Optional.empty(), Optional.empty());

long latestVersion = response.getLatestTableVersion();
// UC returns -1 if only 0.json exists (table just created)
return latestVersion == -1 ? 0 : latestVersion;
} catch (IOException | UCCommitCoordinatorException e) {
throw new RuntimeException("Failed to get latest version for table " + tableId, e);
}
}

@Override
public void close() throws Exception {
rawUCClient.close();
}
}
Original file line number Diff line number Diff line change
@@ -0,0 +1,59 @@
/*
* Copyright (2021) The Delta Lake Project Authors.
*
* Licensed under the Apache License, Version 2.0 (the "License");
* you may not use this file except in compliance with the License.
* You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/

package io.delta.kernel.spark.catalog

import java.util.Optional

import org.apache.spark.sql.catalyst.catalog.CatalogTable
import org.apache.spark.sql.connector.catalog.CatalogPlugin

/**
* Extension trait for catalogs that support Delta catalog-managed commits.
*
* Catalogs implementing this trait can provide centralized commit coordination
* for Delta tables, enabling features like cross-table transactions, stronger
* consistency guarantees, and centralized auditing.
*
* This is a Delta-specific extension to Spark's CatalogPlugin, not part of
* Spark Core.
*/
trait CatalogWithManagedCommits extends CatalogPlugin {

/**
* Returns a client for managing commits for the specified table.
*
* @param tableId catalog-specific table identifier
* @param tablePath filesystem path to table data
* @return ManagedCommitClient if table uses managed commits, empty otherwise
*/
def getManagedCommitClient(
tableId: String,
tablePath: String): Optional[ManagedCommitClient]

/**
* Extracts the catalog-specific table identifier from table properties.
*
* Different catalogs use different property keys:
* - Unity Catalog: "delta.coordinatedCommits.tableId"
* - AWS Glue: "glue.tableId" (future)
* - Polaris: "polaris.tableId" (future)
*
* @param table the Spark catalog table
* @return table identifier if managed by this catalog, empty otherwise
*/
def extractTableId(table: CatalogTable): Optional[String]
}
Original file line number Diff line number Diff line change
@@ -0,0 +1,121 @@
/*
* Copyright (2025) The Delta Lake Project Authors.
*
* Licensed under the Apache License, Version 2.0 (the "License");
* you may not use this file except in compliance with the License.
* You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/

package org.apache.spark.sql.delta.catalog

import java.util.Optional

import io.delta.kernel.spark.catalog.{CatalogWithManagedCommits, ManagedCommitClient, UCManagedCommitClientAdapter}
import io.delta.kernel.unitycatalog.UCCatalogManagedClient
import io.delta.storage.commit.uccommitcoordinator.UCTokenBasedRestClient

import org.apache.spark.internal.Logging
import org.apache.spark.sql.catalyst.catalog.CatalogTable
import org.apache.spark.sql.connector.catalog.CatalogPlugin

/**
* Unity Catalog implementation that supports Delta catalog-managed commits.
*
* This class wraps Unity Catalog functionality and implements the CatalogWithManagedCommits
* trait to enable Delta Kernel to work with Unity Catalog tables using managed commits.
*/
class UnityCatalog extends CatalogPlugin with CatalogWithManagedCommits with Logging {

private var catalogName: String = _

override def name(): String = catalogName

override def initialize(
name: String,
options: org.apache.spark.sql.util.CaseInsensitiveStringMap): Unit = {
this.catalogName = name
}

override def getManagedCommitClient(
tableId: String,
tablePath: String): Optional[ManagedCommitClient] = {

try {
// Get UC catalog configuration from Spark session
val spark = org.apache.spark.sql.SparkSession.active
val catalogConfig = resolveCatalogConfig(spark, catalogName)

if (catalogConfig.isEmpty) {
logWarning(s"No UC configuration found for catalog $catalogName")
return Optional.empty()
}

val (uri, token) = catalogConfig.get

// Create UC REST client
val ucClient = new UCTokenBasedRestClient(uri, token)
val ucManagedClient = new UCCatalogManagedClient(ucClient)

// Wrap in generic adapter
val adapter = new UCManagedCommitClientAdapter(ucManagedClient, ucClient, tablePath)
Optional.of(adapter)
} catch {
case e: Exception =>
logWarning(s"Failed to create UC managed commit client for table $tableId at $tablePath", e)
Optional.empty()
}
}

override def extractTableId(table: CatalogTable): Optional[String] = {
val properties = table.storage.properties

// Try the kernel UC table ID key first
val kernelKey = "catalogManaged.unityCatalog.tableId"
val tableIdOpt = properties.get(kernelKey)
.orElse(properties.get("ucTableId")) // Fallback to legacy key
.filter(_.nonEmpty)

tableIdOpt match {
case Some(id: String) => Optional.of(id)
case _ => Optional.empty[String]()
}
}

/**
* Resolves Unity Catalog configuration from Spark session.
*
* Reads the following configuration keys:
* - spark.sql.catalog.{catalogName}.uri
* - spark.sql.catalog.{catalogName}.token
*
* @param spark SparkSession to read configuration from
* @param catalogName name of the catalog
* @return Option containing (uri, token) if both are configured, None otherwise
*/
private def resolveCatalogConfig(
spark: org.apache.spark.sql.SparkSession,
catalogName: String): Option[(String, String)] = {

val uriKey = s"spark.sql.catalog.$catalogName.uri"
val tokenKey = s"spark.sql.catalog.$catalogName.token"

val uri = spark.conf.getOption(uriKey)
val token = spark.conf.getOption(tokenKey)

(uri, token) match {
case (Some(u), Some(t)) if u.nonEmpty && t.nonEmpty => Some((u, t))
case _ =>
logWarning(s"Missing or empty UC configuration for catalog $catalogName. " +
s"Required: $uriKey and $tokenKey")
None
}
}
}
Loading
Loading