Skip to content
Open
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
Original file line number Diff line number Diff line change
Expand Up @@ -609,12 +609,19 @@ public void createView(ConnectorSession session, SchemaTableName schemaViewName,
definition.getComment().ifPresent(comment -> properties.put(COMMENT, comment));
Schema schema = IcebergUtil.schemaFromViewColumns(typeManager, definition.getColumns());
ViewBuilder viewBuilder = restSessionCatalog.buildView(convert(session), toRemoteView(session, schemaViewName, true));
String tableLocation = defaultTableLocation(session, schemaViewName);
if (replace && useUniqueTableLocation) {
Optional<View> view = getIcebergView(session, schemaViewName, true);
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

will we fail to get the view if it isn't in the view cache?

Copy link
Member Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

will we fail to get the view if it isn't in the view cache?

I believe it will attempt to return from the cache, and if missing it will load it.
It reaches CacheUtils.uncheckedCacheGet, where it does "return cache.get(key, loader::get);"

if (view.isPresent()) {
tableLocation = view.get().location();
}
}
viewBuilder = viewBuilder.withSchema(schema)
.withQuery("trino", definition.getOriginalSql())
.withDefaultNamespace(toRemoteNamespace(session, toNamespace(schemaViewName.getSchemaName())))
.withDefaultCatalog(definition.getCatalog().orElse(null))
.withProperties(properties.buildOrThrow())
.withLocation(defaultTableLocation(session, schemaViewName));
.withLocation(tableLocation);
try {
if (replace) {
viewBuilder.createOrReplace();
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -13,6 +13,7 @@
*/
package io.trino.plugin.iceberg.catalog.rest;

import com.google.common.collect.ImmutableList;
import com.google.common.collect.ImmutableMap;
import io.trino.cache.EvictableCacheBuilder;
import io.trino.metastore.TableInfo;
Expand All @@ -25,18 +26,23 @@
import io.trino.spi.TrinoException;
import io.trino.spi.catalog.CatalogName;
import io.trino.spi.connector.ConnectorMetadata;
import io.trino.spi.connector.ConnectorViewDefinition;
import io.trino.spi.connector.SchemaTableName;
import io.trino.spi.security.PrincipalType;
import io.trino.spi.security.TrinoPrincipal;
import io.trino.spi.type.TestingTypeManager;
import io.trino.spi.type.VarcharType;
import org.apache.iceberg.rest.DelegatingRestSessionCatalog;
import org.apache.iceberg.rest.RESTSessionCatalog;
import org.junit.jupiter.api.Test;

import java.io.IOException;
import java.nio.file.Files;
import java.nio.file.Path;
import java.util.List;
import java.util.Map;
import java.util.Optional;
import java.util.stream.Stream;

import static com.google.common.util.concurrent.MoreExecutors.directExecutor;
import static com.google.common.util.concurrent.MoreExecutors.newDirectExecutorService;
Expand All @@ -59,13 +65,12 @@ public class TestTrinoRestCatalog
protected TrinoCatalog createTrinoCatalog(boolean useUniqueTableLocations)
throws IOException
{
return createTrinoRestCatalog(useUniqueTableLocations, ImmutableMap.of());
return createTrinoRestCatalog(useUniqueTableLocations, ImmutableMap.of(), Files.createTempDirectory(null));
}

private static TrinoRestCatalog createTrinoRestCatalog(boolean useUniqueTableLocations, Map<String, String> properties)
private static TrinoRestCatalog createTrinoRestCatalog(boolean useUniqueTableLocations, Map<String, String> properties, Path warehouseLocation)
throws IOException
{
Path warehouseLocation = Files.createTempDirectory(null);
warehouseLocation.toFile().deleteOnExit();

String catalogName = "iceberg_rest";
Expand Down Expand Up @@ -146,7 +151,7 @@ public void testNonLowercaseNamespace()
public void testPrefix()
throws Exception
{
TrinoCatalog catalog = createTrinoRestCatalog(false, ImmutableMap.of("prefix", "dev"));
TrinoCatalog catalog = createTrinoRestCatalog(false, ImmutableMap.of("prefix", "dev"), Files.createTempDirectory(null));

String namespace = "testPrefixNamespace" + randomNameSuffix();

Expand All @@ -168,4 +173,81 @@ protected TableInfo.ExtendedRelationType getViewType()
{
return OTHER_VIEW;
}

@Test
public void testCreateReplaceViewUniqueLocation()
throws IOException
{
Path warehouseLocation = Files.createTempDirectory("iceberg_catalog_test_create_view_");
TrinoRestCatalog catalog = createTrinoRestCatalog(true, ImmutableMap.of(), warehouseLocation);

String namespace = "test_create_view_" + randomNameSuffix();
String viewName = "viewName";
String renamedViewName = "renamedViewName";
SchemaTableName schemaTableName = new SchemaTableName(namespace, viewName);
SchemaTableName renamedSchemaTableName = new SchemaTableName(namespace, renamedViewName);
ConnectorViewDefinition viewDefinition = new ConnectorViewDefinition(
"SELECT name FROM local.tiny.nation",
Optional.empty(),
Optional.empty(),
ImmutableList.of(
new ConnectorViewDefinition.ViewColumn("name", VarcharType.createUnboundedVarcharType().getTypeId(), Optional.empty())),
Optional.empty(),
Optional.of(SESSION.getUser()),
false,
ImmutableList.of());

try {
catalog.createNamespace(SESSION, namespace, defaultNamespaceProperties(namespace), new TrinoPrincipal(PrincipalType.USER, SESSION.getUser()));
catalog.createView(SESSION, schemaTableName, viewDefinition, false);

assertThat(catalog.listTables(SESSION, Optional.of(namespace)).stream()).contains(new TableInfo(schemaTableName, getViewType()));

Map<SchemaTableName, ConnectorViewDefinition> views = catalog.getViews(SESSION, Optional.of(schemaTableName.getSchemaName()));
assertThat(views).hasSize(1);
assertViewDefinition(views.get(schemaTableName), viewDefinition);
assertViewDefinition(catalog.getView(SESSION, schemaTableName).orElseThrow(), viewDefinition);

catalog.createView(SESSION, schemaTableName, viewDefinition, true);
assertThat(catalog.listTables(SESSION, Optional.of(namespace)).stream()).contains(new TableInfo(schemaTableName, getViewType()));
views = catalog.getViews(SESSION, Optional.of(schemaTableName.getSchemaName()));
assertThat(views).hasSize(1);
assertViewDefinition(views.get(schemaTableName), viewDefinition);
assertViewDefinition(catalog.getView(SESSION, schemaTableName).orElseThrow(), viewDefinition);

List<String> initialViewLocations = getViewLocations(warehouseLocation, namespace);
assertThat(initialViewLocations).hasSize(1);

catalog.renameView(SESSION, schemaTableName, renamedSchemaTableName);
assertThat(catalog.listTables(SESSION, Optional.of(namespace)).stream().map(TableInfo::tableName).toList()).doesNotContain(schemaTableName);
views = catalog.getViews(SESSION, Optional.of(schemaTableName.getSchemaName()));
assertThat(views).hasSize(1);
assertViewDefinition(views.get(renamedSchemaTableName), viewDefinition);
assertViewDefinition(catalog.getView(SESSION, renamedSchemaTableName).orElseThrow(), viewDefinition);
assertThat(catalog.getView(SESSION, schemaTableName)).isEmpty();

List<String> finalViewLocations = getViewLocations(warehouseLocation, namespace);
assertThat(finalViewLocations).hasSize(1);
assertThat(finalViewLocations.getFirst()).isEqualTo(initialViewLocations.getFirst());

catalog.dropView(SESSION, renamedSchemaTableName);
assertThat(catalog.listTables(SESSION, Optional.empty()).stream().map(TableInfo::tableName).toList())
.doesNotContain(renamedSchemaTableName);
}
finally {
catalog.dropNamespace(SESSION, namespace);
}
}

private List<String> getViewLocations(Path warehouseLocation, String namespace)
throws IOException
{
Path namespacePath = warehouseLocation.resolve("iceberg_data").resolve(namespace);
try (Stream<Path> stream = Files.list(namespacePath)) {
return stream
.map(Path::getFileName)
.map(Path::toString)
.toList();
}
}
}