17
17
import io .airlift .slice .Slices ;
18
18
import io .trino .filesystem .TrinoFileSystem ;
19
19
import io .trino .plugin .iceberg .IcebergUtil ;
20
+ import io .trino .plugin .iceberg .StructLikeWrapperWithFieldIdToIndex ;
20
21
import io .trino .plugin .iceberg .fileio .ForwardingFileIoFactory ;
21
22
import io .trino .plugin .iceberg .system .FilesTable ;
22
23
import io .trino .plugin .iceberg .system .IcebergPartitionColumn ;
63
64
import static io .trino .plugin .iceberg .IcebergTypes .convertIcebergValueToTrino ;
64
65
import static io .trino .plugin .iceberg .IcebergUtil .primitiveFieldTypes ;
65
66
import static io .trino .plugin .iceberg .IcebergUtil .readerForManifest ;
67
+ import static io .trino .plugin .iceberg .StructLikeWrapperWithFieldIdToIndex .createStructLikeWrapper ;
66
68
import static io .trino .plugin .iceberg .system .FilesTable .COLUMN_SIZES_COLUMN_NAME ;
67
69
import static io .trino .plugin .iceberg .system .FilesTable .CONTENT_COLUMN_NAME ;
68
70
import static io .trino .plugin .iceberg .system .FilesTable .EQUALITY_IDS_COLUMN_NAME ;
@@ -101,6 +103,7 @@ public final class FilesTablePageSource
101
103
private final Schema schema ;
102
104
private final Schema metadataSchema ;
103
105
private final Map <Integer , PrimitiveType > idToTypeMapping ;
106
+ private final Map <Integer , PartitionSpec > idToPartitionSpecMapping ;
104
107
private final List <PartitionField > partitionFields ;
105
108
private final Optional <IcebergPartitionColumn > partitionColumnType ;
106
109
private final List <Types .NestedField > primitiveFields ;
@@ -123,15 +126,15 @@ public FilesTablePageSource(
123
126
this .schema = SchemaParser .fromJson (requireNonNull (split .schemaJson (), "schema is null" ));
124
127
this .metadataSchema = SchemaParser .fromJson (requireNonNull (split .metadataTableJson (), "metadataSchema is null" ));
125
128
this .idToTypeMapping = primitiveFieldTypes (schema );
126
- Map < Integer , PartitionSpec > specs = split .partitionSpecsByIdJson ().entrySet ().stream ().collect (toImmutableMap (
129
+ this . idToPartitionSpecMapping = split .partitionSpecsByIdJson ().entrySet ().stream ().collect (toImmutableMap (
127
130
Map .Entry ::getKey ,
128
- entry -> PartitionSpecParser .fromJson (SchemaParser . fromJson ( split . schemaJson ()) , entry .getValue ())));
129
- this .partitionFields = getAllPartitionFields (schema , specs );
131
+ entry -> PartitionSpecParser .fromJson (schema , entry .getValue ())));
132
+ this .partitionFields = getAllPartitionFields (schema , idToPartitionSpecMapping );
130
133
this .partitionColumnType = getPartitionColumnType (typeManager , partitionFields , schema );
131
134
this .primitiveFields = IcebergUtil .primitiveFields (schema ).stream ()
132
135
.sorted (Comparator .comparing (Types .NestedField ::name ))
133
136
.collect (toImmutableList ());
134
- ManifestReader <? extends ContentFile <?>> manifestReader = closer .register (readerForManifest (split .manifestFile (), fileIoFactory .create (trinoFileSystem ), specs ));
137
+ ManifestReader <? extends ContentFile <?>> manifestReader = closer .register (readerForManifest (split .manifestFile (), fileIoFactory .create (trinoFileSystem ), idToPartitionSpecMapping ));
135
138
// TODO figure out why selecting the specific column causes null to be returned for offset_splits
136
139
this .contentIterator = closer .register (requireNonNull (manifestReader , "manifestReader is null" ).iterator ());
137
140
this .pageBuilder = new PageBuilder (requiredColumns .stream ().map (column -> {
@@ -195,20 +198,26 @@ public SourcePage getNextSourcePage()
195
198
writeValueOrNull (pageBuilder , SPEC_ID_COLUMN_NAME , contentFile ::specId , INTEGER ::writeInt );
196
199
// partitions
197
200
if (partitionColumnType .isPresent () && columnNameToIndex .containsKey (FilesTable .PARTITION_COLUMN_NAME )) {
201
+ PartitionSpec partitionSpec = idToPartitionSpecMapping .get (contentFile .specId ());
202
+ StructLikeWrapperWithFieldIdToIndex partitionStruct = createStructLikeWrapper (partitionSpec .partitionType (), contentFile .partition ());
198
203
List <Type > partitionTypes = partitionTypes (partitionFields , idToTypeMapping );
204
+ List <? extends Class <?>> partitionColumnClass = partitionTypes .stream ()
205
+ .map (type -> type .typeId ().javaClass ())
206
+ .collect (toImmutableList ());
199
207
List <io .trino .spi .type .Type > partitionColumnTypes = partitionColumnType .orElseThrow ().rowType ().getFields ().stream ()
200
208
.map (RowType .Field ::getType )
201
209
.collect (toImmutableList ());
202
210
203
211
if (pageBuilder .getBlockBuilder (columnNameToIndex .get (FilesTable .PARTITION_COLUMN_NAME )) instanceof RowBlockBuilder rowBlockBuilder ) {
204
212
rowBlockBuilder .buildEntry (fields -> {
205
213
for (int i = 0 ; i < partitionColumnTypes .size (); i ++) {
206
- Type type = partitionTypes .get (i );
207
214
io .trino .spi .type .Type trinoType = partitionColumnType .get ().rowType ().getFields ().get (i ).getType ();
208
215
Object value = null ;
209
216
Integer fieldId = partitionColumnType .get ().fieldIds ().get (i );
210
- if (fieldId != null ) {
211
- value = convertIcebergValueToTrino (type , contentFile .partition ().get (i , type .typeId ().javaClass ()));
217
+ if (partitionStruct .getFieldIdToIndex ().containsKey (fieldId )) {
218
+ value = convertIcebergValueToTrino (
219
+ partitionTypes .get (i ),
220
+ partitionStruct .getStructLikeWrapper ().get ().get (partitionStruct .getFieldIdToIndex ().get (fieldId ), partitionColumnClass .get (i )));
212
221
}
213
222
writeNativeValue (trinoType , fields .get (i ), value );
214
223
}
0 commit comments