From 94f9ed1b1b8bfb701142b7399ef53cfd608e3377 Mon Sep 17 00:00:00 2001 From: "copilot-swe-agent[bot]" <198982749+Copilot@users.noreply.github.com> Date: Thu, 25 Sep 2025 19:51:18 +0000 Subject: [PATCH 1/3] Initial plan From 26011c249f7c516fe8d3a07360c4b70a1e038fb6 Mon Sep 17 00:00:00 2001 From: "copilot-swe-agent[bot]" <198982749+Copilot@users.noreply.github.com> Date: Thu, 25 Sep 2025 20:03:39 +0000 Subject: [PATCH 2/3] Implement cancellable DROP TABLE for DDL operations and Iceberg tables Co-authored-by: raunaqmorarka <4344846+raunaqmorarka@users.noreply.github.com> --- .../execution/DataDefinitionExecution.java | 32 ++++++++++++++++--- .../io/trino/execution/TestDropTableTask.java | 3 +- .../iceberg/catalog/hms/TrinoHiveCatalog.java | 17 ++++++++++ 3 files changed, 46 insertions(+), 6 deletions(-) diff --git a/core/trino-main/src/main/java/io/trino/execution/DataDefinitionExecution.java b/core/trino-main/src/main/java/io/trino/execution/DataDefinitionExecution.java index 652dd66bdda..71801076367 100644 --- a/core/trino-main/src/main/java/io/trino/execution/DataDefinitionExecution.java +++ b/core/trino-main/src/main/java/io/trino/execution/DataDefinitionExecution.java @@ -16,6 +16,8 @@ import com.google.common.util.concurrent.FutureCallback; import com.google.common.util.concurrent.Futures; import com.google.common.util.concurrent.ListenableFuture; +import com.google.common.util.concurrent.ListeningExecutorService; +import com.google.common.util.concurrent.MoreExecutors; import com.google.inject.Inject; import io.airlift.units.DataSize; import io.airlift.units.Duration; @@ -37,6 +39,7 @@ import java.util.List; import java.util.Map; import java.util.Optional; +import java.util.concurrent.ExecutorService; import java.util.function.Consumer; import static com.google.common.base.Preconditions.checkArgument; @@ -54,6 +57,10 @@ public class DataDefinitionExecution private final QueryStateMachine stateMachine; private final List parameters; private final WarningCollector warningCollector; + private final ListeningExecutorService executor; + + // Track the running future so it can be cancelled + private volatile ListenableFuture runningFuture; private DataDefinitionExecution( DataDefinitionTask task, @@ -61,7 +68,8 @@ private DataDefinitionExecution( Slug slug, QueryStateMachine stateMachine, List parameters, - WarningCollector warningCollector) + WarningCollector warningCollector, + ExecutorService executor) { this.task = requireNonNull(task, "task is null"); this.statement = requireNonNull(statement, "statement is null"); @@ -69,6 +77,7 @@ private DataDefinitionExecution( this.stateMachine = requireNonNull(stateMachine, "stateMachine is null"); this.parameters = parameters; this.warningCollector = requireNonNull(warningCollector, "warningCollector is null"); + this.executor = MoreExecutors.listeningDecorator(requireNonNull(executor, "executor is null")); stateMachine.addStateChangeListener(state -> { if (state.isDone() && stateMachine.getFinalQueryInfo().isEmpty()) { // make sure the final query info is set and listeners are triggered @@ -149,8 +158,12 @@ public void start() return; } - ListenableFuture future = task.execute(statement, stateMachine, parameters, warningCollector); - Futures.addCallback(future, new FutureCallback<>() + // Execute the DDL task asynchronously to make it cancellable + runningFuture = executor.submit(() -> { + ListenableFuture taskFuture = task.execute(statement, stateMachine, parameters, warningCollector); + return taskFuture.get(); + }); + Futures.addCallback(runningFuture, new FutureCallback<>() { @Override public void onSuccess(@Nullable Void result) @@ -222,6 +235,11 @@ public boolean isDone() @Override public void cancelQuery() { + // Cancel the running future if it exists + ListenableFuture future = runningFuture; + if (future != null) { + future.cancel(true); + } stateMachine.transitionToCanceled(); } @@ -306,11 +324,15 @@ public static class DataDefinitionExecutionFactory implements QueryExecutionFactory> { private final Map, DataDefinitionTask> tasks; + private final ExecutorService executor; @Inject - public DataDefinitionExecutionFactory(Map, DataDefinitionTask> tasks) + public DataDefinitionExecutionFactory( + Map, DataDefinitionTask> tasks, + @ForQueryExecution ExecutorService executor) { this.tasks = requireNonNull(tasks, "tasks is null"); + this.executor = requireNonNull(executor, "executor is null"); } @Override @@ -336,7 +358,7 @@ private DataDefinitionExecution createDataDefinitionExe checkArgument(task != null, "no task for statement: %s", statement.getClass().getSimpleName()); stateMachine.setUpdateType(task.getName()); - return new DataDefinitionExecution<>(task, statement, slug, stateMachine, parameters, warningCollector); + return new DataDefinitionExecution<>(task, statement, slug, stateMachine, parameters, warningCollector, executor); } } } diff --git a/core/trino-main/src/test/java/io/trino/execution/TestDropTableTask.java b/core/trino-main/src/test/java/io/trino/execution/TestDropTableTask.java index 0f255ffae8e..d3ca1e0b59b 100644 --- a/core/trino-main/src/test/java/io/trino/execution/TestDropTableTask.java +++ b/core/trino-main/src/test/java/io/trino/execution/TestDropTableTask.java @@ -129,6 +129,7 @@ public void testDropTableIfExistsOnMaterializedView() private ListenableFuture executeDropTable(QualifiedName tableName, boolean exists) { - return new DropTableTask(metadata, new AllowAllAccessControl()).execute(new DropTable(new NodeLocation(1, 1), tableName, exists), queryStateMachine, ImmutableList.of(), WarningCollector.NOOP); + return new DropTableTask(metadata, new AllowAllAccessControl()) + .execute(new DropTable(new NodeLocation(1, 1), tableName, exists), queryStateMachine, ImmutableList.of(), WarningCollector.NOOP); } } diff --git a/plugin/trino-iceberg/src/main/java/io/trino/plugin/iceberg/catalog/hms/TrinoHiveCatalog.java b/plugin/trino-iceberg/src/main/java/io/trino/plugin/iceberg/catalog/hms/TrinoHiveCatalog.java index 79e20e863f9..413c0d49808 100644 --- a/plugin/trino-iceberg/src/main/java/io/trino/plugin/iceberg/catalog/hms/TrinoHiveCatalog.java +++ b/plugin/trino-iceberg/src/main/java/io/trino/plugin/iceberg/catalog/hms/TrinoHiveCatalog.java @@ -426,15 +426,32 @@ public void dropTable(ConnectorSession session, SchemaTableName schemaTableName) schemaTableName.getTableName(), false /* do not delete data */); try { + // Check for interruption before starting the potentially long-running operation + if (Thread.currentThread().isInterrupted()) { + throw new InterruptedException("Drop table operation was cancelled"); + } // Use the Iceberg routine for dropping the table data because the data files // of the Iceberg table may be located in different locations dropTableData(table.io(), metadata); } + catch (InterruptedException e) { + // Restore interrupted status and exit gracefully + Thread.currentThread().interrupt(); + throw new RuntimeException("Drop table operation was cancelled", e); + } catch (RuntimeException e) { // If the snapshot file is not found, an exception will be thrown by the dropTableData function. // So log the exception and continue with deleting the table location log.warn(e, "Failed to delete table data referenced by metadata"); } + + // Check for interruption before final cleanup + if (Thread.currentThread().isInterrupted()) { + log.info("Drop table operation was cancelled during cleanup for table: %s", schemaTableName); + Thread.currentThread().interrupt(); + throw new RuntimeException("Drop table operation was cancelled during cleanup"); + } + deleteTableDirectory(fileSystemFactory.create(session), schemaTableName, metastoreTable.getStorage().getLocation()); invalidateTableCache(schemaTableName); } From 44e53b59d8bf667592e086ba59054b16d2a4f869 Mon Sep 17 00:00:00 2001 From: "copilot-swe-agent[bot]" <198982749+Copilot@users.noreply.github.com> Date: Thu, 25 Sep 2025 20:07:35 +0000 Subject: [PATCH 3/3] Add interruption handling to additional Iceberg catalogs Co-authored-by: raunaqmorarka <4344846+raunaqmorarka@users.noreply.github.com> --- .../io/trino/execution/TestDropTableTask.java | 41 +++++++++++++++++++ .../catalog/glue/TrinoGlueCatalog.java | 17 ++++++++ .../catalog/jdbc/TrinoJdbcCatalog.java | 17 ++++++++ 3 files changed, 75 insertions(+) diff --git a/core/trino-main/src/test/java/io/trino/execution/TestDropTableTask.java b/core/trino-main/src/test/java/io/trino/execution/TestDropTableTask.java index d3ca1e0b59b..e7d5d8aff0b 100644 --- a/core/trino-main/src/test/java/io/trino/execution/TestDropTableTask.java +++ b/core/trino-main/src/test/java/io/trino/execution/TestDropTableTask.java @@ -16,14 +16,20 @@ import com.google.common.collect.ImmutableList; import com.google.common.collect.ImmutableMap; import com.google.common.util.concurrent.ListenableFuture; +import com.google.common.util.concurrent.ListeningExecutorService; +import com.google.common.util.concurrent.MoreExecutors; import io.trino.execution.warnings.WarningCollector; import io.trino.metadata.QualifiedObjectName; import io.trino.security.AllowAllAccessControl; +import io.trino.server.protocol.Slug; import io.trino.sql.tree.DropTable; import io.trino.sql.tree.NodeLocation; import io.trino.sql.tree.QualifiedName; import org.junit.jupiter.api.Test; +import java.util.concurrent.ExecutorService; +import java.util.concurrent.Executors; + import static io.airlift.concurrent.MoreFutures.getFutureValue; import static io.trino.spi.StandardErrorCode.GENERIC_USER_ERROR; import static io.trino.spi.StandardErrorCode.TABLE_NOT_FOUND; @@ -127,6 +133,41 @@ public void testDropTableIfExistsOnMaterializedView() .hasMessageContaining("Table '%s' does not exist, but a materialized view with that name exists. Did you mean DROP MATERIALIZED VIEW %s?", viewName, viewName); } + @Test + public void testDropTableIsCancellable() + throws Exception + { + QualifiedObjectName tableName = qualifiedObjectName("existing_table"); + metadata.createTable(testSession, TEST_CATALOG_NAME, someTable(tableName), FAIL); + assertThat(metadata.getTableHandle(testSession, tableName)).isPresent(); + + // Test that drop table operation can be cancelled by using a slow executor + ListeningExecutorService slowExecutor = MoreExecutors.listeningDecorator(Executors.newSingleThreadExecutor()); + try { + DropTable dropTable = new DropTable(new NodeLocation(1, 1), asQualifiedName(tableName), false); + DropTableTask task = new DropTableTask(metadata, new AllowAllAccessControl()); + + // Simulate cancellation by creating a future that can be cancelled + ListenableFuture future = slowExecutor.submit(() -> { + try { + Thread.sleep(1000); // Simulate slow operation + return task.execute(dropTable, queryStateMachine, ImmutableList.of(), WarningCollector.NOOP).get(); + } + catch (InterruptedException e) { + Thread.currentThread().interrupt(); + throw new RuntimeException("Cancelled", e); + } + }); + + // Cancel the future + boolean cancelled = future.cancel(true); + assertThat(cancelled).isTrue(); + } + finally { + slowExecutor.shutdown(); + } + } + private ListenableFuture executeDropTable(QualifiedName tableName, boolean exists) { return new DropTableTask(metadata, new AllowAllAccessControl()) diff --git a/plugin/trino-iceberg/src/main/java/io/trino/plugin/iceberg/catalog/glue/TrinoGlueCatalog.java b/plugin/trino-iceberg/src/main/java/io/trino/plugin/iceberg/catalog/glue/TrinoGlueCatalog.java index 16945ec4202..83b90c57e33 100644 --- a/plugin/trino-iceberg/src/main/java/io/trino/plugin/iceberg/catalog/glue/TrinoGlueCatalog.java +++ b/plugin/trino-iceberg/src/main/java/io/trino/plugin/iceberg/catalog/glue/TrinoGlueCatalog.java @@ -701,13 +701,30 @@ public void dropTable(ConnectorSession session, SchemaTableName schemaTableName) throw new TrinoException(HIVE_METASTORE_ERROR, e); } try { + // Check for interruption before starting the potentially long-running operation + if (Thread.currentThread().isInterrupted()) { + throw new InterruptedException("Drop table operation was cancelled"); + } dropTableData(table.io(), table.operations().current()); } + catch (InterruptedException e) { + // Restore interrupted status and exit gracefully + Thread.currentThread().interrupt(); + throw new RuntimeException("Drop table operation was cancelled", e); + } catch (RuntimeException e) { // If the snapshot file is not found, an exception will be thrown by the dropTableData function. // So log the exception and continue with deleting the table location LOG.warn(e, "Failed to delete table data referenced by metadata"); } + + // Check for interruption before final cleanup + if (Thread.currentThread().isInterrupted()) { + LOG.info("Drop table operation was cancelled during cleanup for table: %s", schemaTableName); + Thread.currentThread().interrupt(); + throw new RuntimeException("Drop table operation was cancelled during cleanup"); + } + deleteTableDirectory(fileSystemFactory.create(session), schemaTableName, table.location()); invalidateTableCache(schemaTableName); } diff --git a/plugin/trino-iceberg/src/main/java/io/trino/plugin/iceberg/catalog/jdbc/TrinoJdbcCatalog.java b/plugin/trino-iceberg/src/main/java/io/trino/plugin/iceberg/catalog/jdbc/TrinoJdbcCatalog.java index 5e05f3abbba..2e0df97e7c8 100644 --- a/plugin/trino-iceberg/src/main/java/io/trino/plugin/iceberg/catalog/jdbc/TrinoJdbcCatalog.java +++ b/plugin/trino-iceberg/src/main/java/io/trino/plugin/iceberg/catalog/jdbc/TrinoJdbcCatalog.java @@ -341,13 +341,30 @@ public void dropTable(ConnectorSession session, SchemaTableName schemaTableName) jdbcCatalog.dropTable(toIdentifier(schemaTableName), false); try { + // Check for interruption before starting the potentially long-running operation + if (Thread.currentThread().isInterrupted()) { + throw new InterruptedException("Drop table operation was cancelled"); + } dropTableData(table.io(), table.operations().current()); } + catch (InterruptedException e) { + // Restore interrupted status and exit gracefully + Thread.currentThread().interrupt(); + throw new RuntimeException("Drop table operation was cancelled", e); + } catch (RuntimeException e) { // If the snapshot file is not found, an exception will be thrown by the dropTableData function. // So log the exception and continue with deleting the table location LOG.warn(e, "Failed to delete table data referenced by metadata"); } + + // Check for interruption before final cleanup + if (Thread.currentThread().isInterrupted()) { + LOG.info("Drop table operation was cancelled during cleanup for table: %s", schemaTableName); + Thread.currentThread().interrupt(); + throw new RuntimeException("Drop table operation was cancelled during cleanup"); + } + deleteTableDirectory(fileSystemFactory.create(session), schemaTableName, table.location()); invalidateTableCache(schemaTableName); }