Skip to content

Commit 6f313c9

Browse files
chenjian2664ebyhr
authored andcommitted
Introduce DeltaLakeFileSystemFactory and VendedCredentialsProvider in Delta Lake
1 parent 0fa04ac commit 6f313c9

File tree

67 files changed

+790
-354
lines changed

Some content is hidden

Large Commits have some content hidden by default. Use the searchbox below for content that may be hidden.

67 files changed

+790
-354
lines changed

plugin/trino-delta-lake/src/main/java/io/trino/plugin/deltalake/AbstractDeltaLakePageSink.java

Lines changed: 4 additions & 3 deletions
Original file line numberDiff line numberDiff line change
@@ -23,10 +23,10 @@
2323
import io.airlift.slice.Slice;
2424
import io.trino.filesystem.Location;
2525
import io.trino.filesystem.TrinoFileSystem;
26-
import io.trino.filesystem.TrinoFileSystemFactory;
2726
import io.trino.metastore.Partitions;
2827
import io.trino.parquet.writer.ParquetWriterOptions;
2928
import io.trino.plugin.deltalake.DataFileInfo.DataFileType;
29+
import io.trino.plugin.deltalake.metastore.VendedCredentialsHandle;
3030
import io.trino.plugin.deltalake.util.DeltaLakeWriteUtils;
3131
import io.trino.plugin.hive.parquet.ParquetFileWriter;
3232
import io.trino.spi.Page;
@@ -118,10 +118,11 @@ public AbstractDeltaLakePageSink(
118118
List<DeltaLakeColumnHandle> inputColumns,
119119
List<String> originalPartitionColumns,
120120
PageIndexerFactory pageIndexerFactory,
121-
TrinoFileSystemFactory fileSystemFactory,
121+
DeltaLakeFileSystemFactory fileSystemFactory,
122122
int maxOpenWriters,
123123
JsonCodec<DataFileInfo> dataFileInfoCodec,
124124
Location tableLocation,
125+
VendedCredentialsHandle credentialsHandle,
125126
Location outputPathDirectory,
126127
ConnectorSession session,
127128
DeltaLakeWriterStats stats,
@@ -133,7 +134,7 @@ public AbstractDeltaLakePageSink(
133134

134135
requireNonNull(pageIndexerFactory, "pageIndexerFactory is null");
135136

136-
this.fileSystem = requireNonNull(fileSystemFactory, "fileSystemFactory is null").create(session);
137+
this.fileSystem = requireNonNull(fileSystemFactory, "fileSystemFactory is null").create(session, credentialsHandle);
137138
this.maxOpenWriters = maxOpenWriters;
138139
this.dataFileInfoCodec = requireNonNull(dataFileInfoCodec, "dataFileInfoCodec is null");
139140
this.parquetSchemaMapping = requireNonNull(parquetSchemaMapping, "parquetSchemaMapping is null");

plugin/trino-delta-lake/src/main/java/io/trino/plugin/deltalake/BaseTransactionsTable.java

Lines changed: 4 additions & 5 deletions
Original file line numberDiff line numberDiff line change
@@ -15,7 +15,6 @@
1515

1616
import io.airlift.units.DataSize;
1717
import io.trino.filesystem.TrinoFileSystem;
18-
import io.trino.filesystem.TrinoFileSystemFactory;
1918
import io.trino.plugin.deltalake.metastore.DeltaMetastoreTable;
2019
import io.trino.plugin.deltalake.transactionlog.TableSnapshot;
2120
import io.trino.plugin.deltalake.transactionlog.Transaction;
@@ -51,13 +50,13 @@ public abstract class BaseTransactionsTable
5150
implements SystemTable
5251
{
5352
private final DeltaMetastoreTable table;
54-
private final TrinoFileSystemFactory fileSystemFactory;
53+
private final DeltaLakeFileSystemFactory fileSystemFactory;
5554
private final TransactionLogAccess transactionLogAccess;
5655
private final ConnectorTableMetadata tableMetadata;
5756

5857
public BaseTransactionsTable(
5958
DeltaMetastoreTable table,
60-
TrinoFileSystemFactory fileSystemFactory,
59+
DeltaLakeFileSystemFactory fileSystemFactory,
6160
TransactionLogAccess transactionLogAccess,
6261
TypeManager typeManager,
6362
ConnectorTableMetadata tableMetadata)
@@ -84,12 +83,13 @@ public ConnectorTableMetadata getTableMetadata()
8483
@Override
8584
public ConnectorPageSource pageSource(ConnectorTransactionHandle transactionHandle, ConnectorSession session, TupleDomain<Integer> constraint)
8685
{
86+
TrinoFileSystem fileSystem = fileSystemFactory.create(session, table);
8787
long snapshotVersion;
8888
try {
8989
// Verify the transaction log is readable
9090
TableSnapshot tableSnapshot = transactionLogAccess.loadSnapshot(session, table, Optional.empty());
9191
snapshotVersion = tableSnapshot.getVersion();
92-
transactionLogAccess.getMetadataEntry(session, tableSnapshot);
92+
transactionLogAccess.getMetadataEntry(session, fileSystem, tableSnapshot);
9393
}
9494
catch (IOException e) {
9595
throw new TrinoException(DeltaLakeErrorCode.DELTA_LAKE_INVALID_SCHEMA, "Unable to load table metadata from location: " + table.location(), e);
@@ -135,7 +135,6 @@ public ConnectorPageSource pageSource(ConnectorTransactionHandle transactionHand
135135
endVersionInclusive = Optional.of(snapshotVersion);
136136
}
137137

138-
TrinoFileSystem fileSystem = fileSystemFactory.create(session);
139138
PageListBuilder pagesBuilder = PageListBuilder.forTable(tableMetadata);
140139
try {
141140
checkArgument(endVersionInclusive.isPresent(), "endVersionInclusive must be present");

plugin/trino-delta-lake/src/main/java/io/trino/plugin/deltalake/CorruptedDeltaLakeTableHandle.java

Lines changed: 12 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -13,15 +13,21 @@
1313
*/
1414
package io.trino.plugin.deltalake;
1515

16+
import io.trino.plugin.deltalake.metastore.VendedCredentials;
17+
import io.trino.plugin.deltalake.metastore.VendedCredentialsHandle;
1618
import io.trino.spi.TrinoException;
1719
import io.trino.spi.connector.SchemaTableName;
1820

21+
import java.util.Optional;
22+
1923
import static java.util.Objects.requireNonNull;
2024

2125
public record CorruptedDeltaLakeTableHandle(
2226
SchemaTableName schemaTableName,
27+
boolean catalogOwned,
2328
boolean managed,
2429
String location,
30+
Optional<VendedCredentials> vendedCredentials,
2531
TrinoException originalException)
2632
implements LocatedTableHandle
2733
{
@@ -37,4 +43,10 @@ public TrinoException createException()
3743
// Original exception originates from a different place. Create a new exception not to confuse reader with a stacktrace not matching call site.
3844
return new TrinoException(originalException::getErrorCode, originalException.getMessage(), originalException);
3945
}
46+
47+
@Override
48+
public VendedCredentialsHandle toCredentialsHandle()
49+
{
50+
return new VendedCredentialsHandle(catalogOwned, managed, location, vendedCredentials.orElse(VendedCredentials.empty()));
51+
}
4052
}
Lines changed: 46 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,46 @@
1+
/*
2+
* Licensed under the Apache License, Version 2.0 (the "License");
3+
* you may not use this file except in compliance with the License.
4+
* You may obtain a copy of the License at
5+
*
6+
* http://www.apache.org/licenses/LICENSE-2.0
7+
*
8+
* Unless required by applicable law or agreed to in writing, software
9+
* distributed under the License is distributed on an "AS IS" BASIS,
10+
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
11+
* See the License for the specific language governing permissions and
12+
* limitations under the License.
13+
*/
14+
package io.trino.plugin.deltalake;
15+
16+
import com.google.inject.Inject;
17+
import io.trino.filesystem.TrinoFileSystem;
18+
import io.trino.filesystem.TrinoFileSystemFactory;
19+
import io.trino.plugin.deltalake.metastore.VendedCredentialsHandle;
20+
import io.trino.spi.connector.ConnectorSession;
21+
22+
import static java.util.Objects.requireNonNull;
23+
24+
public class DefaultDeltaLakeFileSystemFactory
25+
implements DeltaLakeFileSystemFactory
26+
{
27+
private final TrinoFileSystemFactory fileSystemFactory;
28+
29+
@Inject
30+
public DefaultDeltaLakeFileSystemFactory(TrinoFileSystemFactory fileSystemFactory)
31+
{
32+
this.fileSystemFactory = requireNonNull(fileSystemFactory, "fileSystemFactory is null");
33+
}
34+
35+
@Override
36+
public TrinoFileSystem create(ConnectorSession session, VendedCredentialsHandle table)
37+
{
38+
return fileSystemFactory.create(session.getIdentity());
39+
}
40+
41+
@Override
42+
public TrinoFileSystem create(ConnectorSession session, String tableLocation)
43+
{
44+
return fileSystemFactory.create(session.getIdentity());
45+
}
46+
}

plugin/trino-delta-lake/src/main/java/io/trino/plugin/deltalake/DeltaLakeCdfPageSink.java

Lines changed: 4 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -15,7 +15,7 @@
1515

1616
import io.airlift.json.JsonCodec;
1717
import io.trino.filesystem.Location;
18-
import io.trino.filesystem.TrinoFileSystemFactory;
18+
import io.trino.plugin.deltalake.metastore.VendedCredentialsHandle;
1919
import io.trino.spi.PageIndexerFactory;
2020
import io.trino.spi.connector.ConnectorSession;
2121
import io.trino.spi.type.TypeOperators;
@@ -35,10 +35,11 @@ public DeltaLakeCdfPageSink(
3535
List<DeltaLakeColumnHandle> inputColumns,
3636
List<String> originalPartitionColumns,
3737
PageIndexerFactory pageIndexerFactory,
38-
TrinoFileSystemFactory fileSystemFactory,
38+
DeltaLakeFileSystemFactory fileSystemFactory,
3939
int maxOpenWriters,
4040
JsonCodec<DataFileInfo> dataFileInfoCodec,
4141
Location tableLocation,
42+
VendedCredentialsHandle credentialsHandle,
4243
Location outputPath,
4344
ConnectorSession session,
4445
DeltaLakeWriterStats stats,
@@ -54,6 +55,7 @@ public DeltaLakeCdfPageSink(
5455
maxOpenWriters,
5556
dataFileInfoCodec,
5657
tableLocation,
58+
credentialsHandle,
5759
outputPath,
5860
session,
5961
stats,
Lines changed: 66 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,66 @@
1+
/*
2+
* Licensed under the Apache License, Version 2.0 (the "License");
3+
* you may not use this file except in compliance with the License.
4+
* You may obtain a copy of the License at
5+
*
6+
* http://www.apache.org/licenses/LICENSE-2.0
7+
*
8+
* Unless required by applicable law or agreed to in writing, software
9+
* distributed under the License is distributed on an "AS IS" BASIS,
10+
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
11+
* See the License for the specific language governing permissions and
12+
* limitations under the License.
13+
*/
14+
package io.trino.plugin.deltalake;
15+
16+
import io.trino.filesystem.TrinoFileSystem;
17+
import io.trino.filesystem.TrinoFileSystemFactory;
18+
import io.trino.plugin.deltalake.metastore.DeltaMetastoreTable;
19+
import io.trino.plugin.deltalake.metastore.VendedCredentialsHandle;
20+
import io.trino.spi.connector.ConnectorSession;
21+
import io.trino.spi.security.ConnectorIdentity;
22+
23+
public interface DeltaLakeFileSystemFactory
24+
extends TrinoFileSystemFactory
25+
{
26+
default TrinoFileSystem create(ConnectorSession session, DeltaLakeTableHandle table)
27+
{
28+
return create(session, table.toCredentialsHandle());
29+
}
30+
31+
default TrinoFileSystem create(ConnectorSession session, DeltaMetastoreTable table)
32+
{
33+
return create(session, VendedCredentialsHandle.of(table));
34+
}
35+
36+
TrinoFileSystem create(ConnectorSession session, VendedCredentialsHandle table);
37+
38+
/**
39+
* For external table create/write using location
40+
*/
41+
TrinoFileSystem create(ConnectorSession session, String tableLocation);
42+
43+
/**
44+
* @deprecated Use {@link #create(ConnectorSession, VendedCredentialsHandle)} or {@link #create(ConnectorSession, String)}
45+
* instead. The new methods can potentially support vending credentials and may pass more information
46+
* when creating {@link TrinoFileSystem} in the future.
47+
*/
48+
@Deprecated
49+
@Override
50+
default TrinoFileSystem create(ConnectorIdentity identity)
51+
{
52+
throw new UnsupportedOperationException();
53+
}
54+
55+
/**
56+
* @deprecated Use {@link #create(ConnectorSession, VendedCredentialsHandle)} or {@link #create(ConnectorSession, String)}
57+
* instead. The new methods can potentially support vending credentials and may pass more information
58+
* when creating {@link TrinoFileSystem} in the future.
59+
*/
60+
@Deprecated
61+
@Override
62+
default TrinoFileSystem create(ConnectorSession session)
63+
{
64+
throw new UnsupportedOperationException();
65+
}
66+
}

plugin/trino-delta-lake/src/main/java/io/trino/plugin/deltalake/DeltaLakeHistoryTable.java

Lines changed: 1 addition & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -15,7 +15,6 @@
1515

1616
import com.google.common.collect.ImmutableList;
1717
import io.trino.filesystem.TrinoFileSystem;
18-
import io.trino.filesystem.TrinoFileSystemFactory;
1918
import io.trino.plugin.deltalake.metastore.DeltaMetastoreTable;
2019
import io.trino.plugin.deltalake.transactionlog.CommitInfoEntry;
2120
import io.trino.plugin.deltalake.transactionlog.DeltaLakeTransactionLogEntry;
@@ -46,7 +45,7 @@ public class DeltaLakeHistoryTable
4645
{
4746
public DeltaLakeHistoryTable(
4847
DeltaMetastoreTable table,
49-
TrinoFileSystemFactory fileSystemFactory,
48+
DeltaLakeFileSystemFactory fileSystemFactory,
5049
TransactionLogAccess transactionLogAccess,
5150
TypeManager typeManager)
5251
{

plugin/trino-delta-lake/src/main/java/io/trino/plugin/deltalake/DeltaLakeInsertTableHandle.java

Lines changed: 4 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -14,6 +14,7 @@
1414
package io.trino.plugin.deltalake;
1515

1616
import com.google.common.collect.ImmutableList;
17+
import io.trino.plugin.deltalake.metastore.VendedCredentialsHandle;
1718
import io.trino.plugin.deltalake.transactionlog.MetadataEntry;
1819
import io.trino.plugin.deltalake.transactionlog.ProtocolEntry;
1920
import io.trino.spi.connector.ConnectorInsertTableHandle;
@@ -30,7 +31,8 @@ public record DeltaLakeInsertTableHandle(
3031
ProtocolEntry protocolEntry,
3132
List<DeltaLakeColumnHandle> inputColumns,
3233
long readVersion,
33-
boolean retriesEnabled)
34+
boolean retriesEnabled,
35+
VendedCredentialsHandle credentialsHandle)
3436
implements ConnectorInsertTableHandle
3537
{
3638
public DeltaLakeInsertTableHandle
@@ -40,6 +42,7 @@ public record DeltaLakeInsertTableHandle(
4042
requireNonNull(protocolEntry, "protocolEntry is null");
4143
inputColumns = ImmutableList.copyOf(requireNonNull(inputColumns, "inputColumns is null"));
4244
requireNonNull(location, "location is null");
45+
requireNonNull(credentialsHandle, "credentialsHandle is null");
4346
}
4447

4548
@Override

plugin/trino-delta-lake/src/main/java/io/trino/plugin/deltalake/DeltaLakeMergeSink.java

Lines changed: 4 additions & 3 deletions
Original file line numberDiff line numberDiff line change
@@ -21,7 +21,6 @@
2121
import io.airlift.slice.Slices;
2222
import io.trino.filesystem.Location;
2323
import io.trino.filesystem.TrinoFileSystem;
24-
import io.trino.filesystem.TrinoFileSystemFactory;
2524
import io.trino.filesystem.TrinoInputFile;
2625
import io.trino.parquet.ParquetDataSource;
2726
import io.trino.parquet.ParquetReaderOptions;
@@ -31,6 +30,7 @@
3130
import io.trino.parquet.writer.ParquetWriterOptions;
3231
import io.trino.plugin.base.metrics.FileFormatDataSourceStats;
3332
import io.trino.plugin.deltalake.delete.RoaringBitmapArray;
33+
import io.trino.plugin.deltalake.metastore.VendedCredentialsHandle;
3434
import io.trino.plugin.deltalake.transactionlog.DeletionVectorEntry;
3535
import io.trino.plugin.hive.parquet.ParquetFileWriter;
3636
import io.trino.plugin.hive.parquet.ParquetPageSourceFactory;
@@ -140,14 +140,15 @@ public class DeltaLakeMergeSink
140140

141141
public DeltaLakeMergeSink(
142142
TypeOperators typeOperators,
143-
TrinoFileSystemFactory fileSystemFactory,
143+
DeltaLakeFileSystemFactory fileSystemFactory,
144144
ConnectorSession session,
145145
DateTimeZone parquetDateTimeZone,
146146
String trinoVersion,
147147
JsonCodec<DataFileInfo> dataFileInfoCodec,
148148
JsonCodec<DeltaLakeMergeResult> mergeResultJsonCodec,
149149
DeltaLakeWriterStats writerStats,
150150
Location rootTableLocation,
151+
VendedCredentialsHandle credentialsHandle,
151152
ConnectorPageSink insertPageSink,
152153
List<DeltaLakeColumnHandle> tableColumns,
153154
int domainCompactionThreshold,
@@ -163,7 +164,7 @@ public DeltaLakeMergeSink(
163164
{
164165
this.typeOperators = requireNonNull(typeOperators, "typeOperators is null");
165166
this.session = requireNonNull(session, "session is null");
166-
this.fileSystem = fileSystemFactory.create(session);
167+
this.fileSystem = fileSystemFactory.create(session, credentialsHandle);
167168
this.parquetDateTimeZone = requireNonNull(parquetDateTimeZone, "parquetDateTimeZone is null");
168169
this.trinoVersion = requireNonNull(trinoVersion, "trinoVersion is null");
169170
this.dataFileInfoCodec = requireNonNull(dataFileInfoCodec, "dataFileInfoCodec is null");

0 commit comments

Comments
 (0)