Skip to content

Commit 8217319

Browse files
committed
Add Unit Tests and Product Tests for Symlink Tables
1 parent a120690 commit 8217319

File tree

9 files changed

+436
-36
lines changed

9 files changed

+436
-36
lines changed

presto-hive/src/main/java/com/facebook/presto/hive/HiveUtil.java

Lines changed: 15 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -156,6 +156,7 @@
156156
import static com.facebook.presto.hive.HiveErrorCode.HIVE_TABLE_BUCKETING_IS_IGNORED;
157157
import static com.facebook.presto.hive.HiveErrorCode.HIVE_UNSUPPORTED_FORMAT;
158158
import static com.facebook.presto.hive.HiveSessionProperties.isUseListDirectoryCache;
159+
import static com.facebook.presto.hive.HiveStorageFormat.TEXTFILE;
159160
import static com.facebook.presto.hive.metastore.MetastoreUtil.HIVE_DEFAULT_DYNAMIC_PARTITION;
160161
import static com.facebook.presto.hive.metastore.MetastoreUtil.checkCondition;
161162
import static com.facebook.presto.hive.metastore.MetastoreUtil.getMetastoreHeaders;
@@ -363,6 +364,20 @@ public static Optional<CompressionCodec> getCompressionCodec(TextInputFormat inp
363364
throw new PrestoException(HIVE_UNSUPPORTED_FORMAT, "Missing SerDe for SymlinkTextInputFormat");
364365
}
365366

367+
/*
368+
* https://github.com/apache/hive/blob/b240eb3266d4736424678d6c71c3c6f6a6fdbf38/ql/src/java/org/apache/hadoop/hive/ql/io/SymlinkTextInputFormat.java#L47-L52
369+
* According to Hive implementation of SymlinkInputFormat, The target input data should be in TextInputFormat.
370+
*
371+
* But Delta Lake provides an integration with Presto using Symlink Tables with target input data as MapredParquetInputFormat.
372+
* https://docs.delta.io/latest/presto-integration.html
373+
*
374+
* To comply with Hive implementation, we will keep the default value here as TextInputFormat unless serde is not LazySimpleSerDe
375+
*/
376+
if (serDe.equals(TEXTFILE.getSerDe())) {
377+
inputFormatClass = TextInputFormat.class;
378+
return ReflectionUtils.newInstance(inputFormatClass, jobConf);
379+
}
380+
366381
for (HiveStorageFormat hiveStorageFormat : HiveStorageFormat.values()) {
367382
if (serDe.equals(hiveStorageFormat.getSerDe())) {
368383
inputFormatClass = getInputFormatClass(jobConf, hiveStorageFormat.getInputFormat());

presto-hive/src/main/java/com/facebook/presto/hive/StoragePartitionLoader.java

Lines changed: 3 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -24,6 +24,7 @@
2424
import com.facebook.presto.spi.ConnectorSession;
2525
import com.facebook.presto.spi.PrestoException;
2626
import com.facebook.presto.spi.SchemaTableName;
27+
import com.google.common.annotations.VisibleForTesting;
2728
import com.google.common.collect.ArrayListMultimap;
2829
import com.google.common.collect.ImmutableList;
2930
import com.google.common.collect.Iterators;
@@ -239,7 +240,8 @@ private ListenableFuture<?> handleSymlinkTextInputFormat(
239240
targetPaths);
240241
}
241242

242-
private Iterator<InternalHiveSplit> getSymlinkIterator(
243+
@VisibleForTesting
244+
Iterator<InternalHiveSplit> getSymlinkIterator(
243245
Path path,
244246
boolean s3SelectPushdownEnabled,
245247
Storage storage,

presto-hive/src/test/java/com/facebook/presto/hive/TestBackgroundHiveSplitLoader.java

Lines changed: 2 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -123,7 +123,7 @@ public class TestBackgroundHiveSplitLoader
123123
private static final Optional<HiveBucketProperty> BUCKET_PROPERTY = Optional.of(
124124
new HiveBucketProperty(ImmutableList.of("col1"), BUCKET_COUNT, ImmutableList.of(), HIVE_COMPATIBLE, Optional.empty()));
125125

126-
private static final Table SIMPLE_TABLE = table(ImmutableList.of(), Optional.empty());
126+
public static final Table SIMPLE_TABLE = table(ImmutableList.of(), Optional.empty());
127127
private static final Table PARTITIONED_TABLE = table(PARTITION_COLUMNS, BUCKET_PROPERTY);
128128

129129
@Test
@@ -541,7 +541,7 @@ private static BackgroundHiveSplitLoader backgroundHiveSplitLoader(
541541
false);
542542
}
543543

544-
private static List<HivePartitionMetadata> samplePartitionMetadatas()
544+
public static List<HivePartitionMetadata> samplePartitionMetadatas()
545545
{
546546
return ImmutableList.of(
547547
new HivePartitionMetadata(

presto-hive/src/test/java/com/facebook/presto/hive/TestHiveUtil.java

Lines changed: 86 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -17,15 +17,18 @@
1717
import com.facebook.presto.hive.metastore.Storage;
1818
import com.facebook.presto.hive.metastore.StorageFormat;
1919
import com.facebook.presto.hive.metastore.file.FileHiveMetastore;
20+
import com.facebook.presto.spi.PrestoException;
2021
import com.google.common.collect.ImmutableList;
2122
import com.google.common.collect.ImmutableMap;
2223
import com.google.common.collect.ImmutableSet;
2324
import io.airlift.slice.Slices;
2425
import org.apache.hadoop.conf.Configuration;
2526
import org.apache.hadoop.hive.metastore.Warehouse;
2627
import org.apache.hadoop.hive.metastore.api.MetaException;
28+
import org.apache.hadoop.hive.ql.io.SymlinkTextInputFormat;
2729
import org.apache.hadoop.hive.serde2.thrift.ThriftDeserializer;
2830
import org.apache.hadoop.hive.serde2.thrift.test.IntString;
31+
import org.apache.hadoop.mapred.InputFormat;
2932
import org.apache.hudi.hadoop.realtime.HoodieRealtimeFileSplit;
3033
import org.apache.thrift.protocol.TBinaryProtocol;
3134
import org.joda.time.DateTime;
@@ -49,6 +52,9 @@
4952
import static com.facebook.presto.common.type.DateType.DATE;
5053
import static com.facebook.presto.common.type.IntegerType.INTEGER;
5154
import static com.facebook.presto.common.type.VarcharType.VARCHAR;
55+
import static com.facebook.presto.hive.HiveStorageFormat.ORC;
56+
import static com.facebook.presto.hive.HiveStorageFormat.PARQUET;
57+
import static com.facebook.presto.hive.HiveStorageFormat.TEXTFILE;
5258
import static com.facebook.presto.hive.HiveTestUtils.SESSION;
5359
import static com.facebook.presto.hive.HiveUtil.CLIENT_TAGS_DELIMITER;
5460
import static com.facebook.presto.hive.HiveUtil.CUSTOM_FILE_SPLIT_CLASS_KEY;
@@ -75,8 +81,10 @@
7581
import static org.apache.hadoop.hive.serde.serdeConstants.SERIALIZATION_CLASS;
7682
import static org.apache.hadoop.hive.serde.serdeConstants.SERIALIZATION_FORMAT;
7783
import static org.apache.hadoop.hive.serde.serdeConstants.SERIALIZATION_LIB;
84+
import static org.assertj.core.api.Assertions.assertThatThrownBy;
7885
import static org.testng.Assert.assertEquals;
7986
import static org.testng.Assert.assertFalse;
87+
import static org.testng.Assert.assertNotNull;
8088
import static org.testng.Assert.assertTrue;
8189

8290
public class TestHiveUtil
@@ -215,6 +223,84 @@ public void testParsePartitionValue()
215223
assertEquals(prestoValue, Slices.utf8Slice("USA"));
216224
}
217225

226+
@Test
227+
public void testGetInputFormatValidInput()
228+
{
229+
Configuration configuration = new Configuration();
230+
String inputFormatName = ORC.getInputFormat();
231+
String serDe = ORC.getSerDe();
232+
boolean symlinkTarget = false;
233+
234+
InputFormat<?, ?> inputFormat = HiveUtil.getInputFormat(configuration, inputFormatName, serDe, symlinkTarget);
235+
assertNotNull(inputFormat, "InputFormat should not be null for valid input");
236+
assertEquals(inputFormat.getClass().getName(), ORC.getInputFormat());
237+
}
238+
239+
@Test
240+
public void testGetInputFormatInvalidInputFormatName()
241+
{
242+
Configuration configuration = new Configuration();
243+
String inputFormatName = "invalid.InputFormatName";
244+
String serDe = ORC.getSerDe();
245+
boolean symlinkTarget = false;
246+
247+
assertThatThrownBy(() -> HiveUtil.getInputFormat(configuration, inputFormatName, serDe, symlinkTarget))
248+
.isInstanceOf(PrestoException.class)
249+
.hasStackTraceContaining("Unable to create input format invalid.InputFormatName");
250+
}
251+
252+
@Test
253+
public void testGetInputFormatMissingSerDeForSymlinkTextInputFormat()
254+
{
255+
Configuration configuration = new Configuration();
256+
String inputFormatName = SymlinkTextInputFormat.class.getName();
257+
String serDe = null;
258+
boolean symlinkTarget = true;
259+
260+
assertThatThrownBy(() -> HiveUtil.getInputFormat(configuration, inputFormatName, serDe, symlinkTarget))
261+
.isInstanceOf(PrestoException.class)
262+
.hasStackTraceContaining("Missing SerDe for SymlinkTextInputFormat");
263+
}
264+
265+
@Test
266+
public void testGetInputFormatUnsupportedSerDeForSymlinkTextInputFormat()
267+
{
268+
Configuration configuration = new Configuration();
269+
String inputFormatName = SymlinkTextInputFormat.class.getName();
270+
String serDe = "unsupported.SerDe";
271+
boolean symlinkTarget = true;
272+
273+
assertThatThrownBy(() -> HiveUtil.getInputFormat(configuration, inputFormatName, serDe, symlinkTarget))
274+
.isInstanceOf(PrestoException.class)
275+
.hasStackTraceContaining("Unsupported SerDe for SymlinkTextInputFormat: unsupported.SerDe");
276+
}
277+
278+
@Test
279+
public void testGetInputFormatForAllSupportedSerDesForSymlinkTextInputFormat()
280+
{
281+
Configuration configuration = new Configuration();
282+
boolean symlinkTarget = true;
283+
284+
/*
285+
* https://github.com/apache/hive/blob/b240eb3266d4736424678d6c71c3c6f6a6fdbf38/ql/src/java/org/apache/hadoop/hive/ql/io/SymlinkTextInputFormat.java#L47-L52
286+
* According to Hive implementation of SymlinkInputFormat, The target input data should be in TextInputFormat.
287+
*
288+
* But another common use-case of Symlink Tables is to read Delta Lake Symlink Tables with target input data as MapredParquetInputFormat
289+
* https://docs.delta.io/latest/presto-integration.html
290+
*/
291+
List<HiveStorageFormat> supportedFormats = ImmutableList.of(PARQUET, TEXTFILE);
292+
293+
for (HiveStorageFormat hiveStorageFormat : supportedFormats) {
294+
String inputFormatName = SymlinkTextInputFormat.class.getName();
295+
String serDe = hiveStorageFormat.getSerDe();
296+
297+
InputFormat<?, ?> inputFormat = HiveUtil.getInputFormat(configuration, inputFormatName, serDe, symlinkTarget);
298+
299+
assertNotNull(inputFormat, "InputFormat should not be null for valid SerDe: " + serDe);
300+
assertEquals(inputFormat.getClass().getName(), hiveStorageFormat.getInputFormat());
301+
}
302+
}
303+
218304
private static void assertToPartitionValues(String partitionName)
219305
throws MetaException
220306
{
Lines changed: 156 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,156 @@
1+
/*
2+
* Licensed under the Apache License, Version 2.0 (the "License");
3+
* you may not use this file except in compliance with the License.
4+
* You may obtain a copy of the License at
5+
*
6+
* http://www.apache.org/licenses/LICENSE-2.0
7+
*
8+
* Unless required by applicable law or agreed to in writing, software
9+
* distributed under the License is distributed on an "AS IS" BASIS,
10+
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
11+
* See the License for the specific language governing permissions and
12+
* limitations under the License.
13+
*/
14+
package com.facebook.presto.hive;
15+
16+
import com.facebook.presto.hive.TestBackgroundHiveSplitLoader.TestingHdfsEnvironment;
17+
import com.facebook.presto.spi.ConnectorSession;
18+
import com.facebook.presto.testing.TestingConnectorSession;
19+
import com.google.common.collect.ImmutableList;
20+
import com.google.common.collect.ImmutableMap;
21+
import io.airlift.units.DataSize;
22+
import io.airlift.units.Duration;
23+
import org.apache.hadoop.conf.Configuration;
24+
import org.apache.hadoop.fs.BlockLocation;
25+
import org.apache.hadoop.fs.LocatedFileStatus;
26+
import org.apache.hadoop.fs.Path;
27+
import org.apache.hadoop.hive.ql.io.SymlinkTextInputFormat;
28+
import org.apache.hadoop.mapred.InputFormat;
29+
import org.testng.annotations.Test;
30+
31+
import java.util.Iterator;
32+
import java.util.List;
33+
import java.util.Optional;
34+
import java.util.concurrent.ConcurrentLinkedDeque;
35+
import java.util.concurrent.TimeUnit;
36+
37+
import static com.facebook.presto.hive.HiveSessionProperties.isSkipEmptyFilesEnabled;
38+
import static com.facebook.presto.hive.HiveSessionProperties.isUseListDirectoryCache;
39+
import static com.facebook.presto.hive.HiveStorageFormat.PARQUET;
40+
import static com.facebook.presto.hive.HiveTestUtils.getAllSessionProperties;
41+
import static com.facebook.presto.hive.HiveUtil.buildDirectoryContextProperties;
42+
import static com.facebook.presto.hive.NestedDirectoryPolicy.IGNORED;
43+
import static com.facebook.presto.hive.StoragePartitionLoader.BucketSplitInfo.createBucketSplitInfo;
44+
import static com.facebook.presto.hive.TestBackgroundHiveSplitLoader.SIMPLE_TABLE;
45+
import static com.facebook.presto.hive.TestBackgroundHiveSplitLoader.samplePartitionMetadatas;
46+
import static com.google.common.collect.ImmutableList.toImmutableList;
47+
import static com.google.common.collect.Iterables.getOnlyElement;
48+
import static io.airlift.units.DataSize.Unit.GIGABYTE;
49+
import static io.airlift.units.DataSize.Unit.KILOBYTE;
50+
import static org.testng.Assert.assertEquals;
51+
52+
public class TestStoragePartitionLoader
53+
{
54+
@Test
55+
public void testGetSymlinkIterator()
56+
throws Exception
57+
{
58+
CachingDirectoryLister directoryLister = new CachingDirectoryLister(
59+
new HadoopDirectoryLister(),
60+
new Duration(5, TimeUnit.MINUTES),
61+
new DataSize(100, KILOBYTE),
62+
ImmutableList.of());
63+
64+
Configuration configuration = new Configuration(false);
65+
66+
InputFormat<?, ?> inputFormat = HiveUtil.getInputFormat(
67+
configuration,
68+
SymlinkTextInputFormat.class.getName(),
69+
PARQUET.getSerDe(),
70+
true);
71+
72+
Path firstFilePath = new Path("hdfs://hadoop:9000/db_name/table_name/file1");
73+
Path secondFilePath = new Path("hdfs://hadoop:9000/db_name/table_name/file2");
74+
List<Path> paths = ImmutableList.of(firstFilePath, secondFilePath);
75+
List<LocatedFileStatus> files = paths.stream()
76+
.map(path -> locatedFileStatus(path, 0L))
77+
.collect(toImmutableList());
78+
79+
ConnectorSession connectorSession = new TestingConnectorSession(getAllSessionProperties(
80+
new HiveClientConfig().setMaxSplitSize(new DataSize(1.0, GIGABYTE))
81+
.setFileStatusCacheTables(""),
82+
new HiveCommonClientConfig()));
83+
84+
StoragePartitionLoader storagePartitionLoader = storagePartitionLoader(files, directoryLister, connectorSession);
85+
86+
HdfsContext hdfsContext = new HdfsContext(
87+
connectorSession,
88+
SIMPLE_TABLE.getDatabaseName(),
89+
SIMPLE_TABLE.getTableName(),
90+
SIMPLE_TABLE.getStorage().getLocation(),
91+
false);
92+
93+
HiveDirectoryContext hiveDirectoryContext = new HiveDirectoryContext(
94+
IGNORED,
95+
isUseListDirectoryCache(connectorSession),
96+
isSkipEmptyFilesEnabled(connectorSession),
97+
hdfsContext.getIdentity(),
98+
buildDirectoryContextProperties(connectorSession),
99+
connectorSession.getRuntimeStats());
100+
101+
Iterator<InternalHiveSplit> symlinkIterator = storagePartitionLoader.getSymlinkIterator(
102+
new Path("hdfs://hadoop:9000/db_name/table_name/symlink_manifest"),
103+
false,
104+
SIMPLE_TABLE.getStorage(),
105+
ImmutableList.of(),
106+
"UNPARTITIONED",
107+
SIMPLE_TABLE.getDataColumns().size(),
108+
getOnlyElement(samplePartitionMetadatas()),
109+
true,
110+
new Path("hdfs://hadoop:9000/db_name/table_name/"),
111+
paths,
112+
inputFormat,
113+
hiveDirectoryContext);
114+
115+
List<InternalHiveSplit> splits = ImmutableList.copyOf(symlinkIterator);
116+
assertEquals(splits.size(), 2);
117+
assertEquals(splits.get(0).getPath(), firstFilePath.toString());
118+
assertEquals(splits.get(1).getPath(), secondFilePath.toString());
119+
}
120+
121+
private static LocatedFileStatus locatedFileStatus(Path path, long fileSize)
122+
{
123+
return new LocatedFileStatus(
124+
fileSize,
125+
false,
126+
0,
127+
0L,
128+
0L,
129+
0L,
130+
null,
131+
null,
132+
null,
133+
null,
134+
path,
135+
new org.apache.hadoop.fs.BlockLocation[]{new BlockLocation(new String[1], new String[]{"localhost"}, 0, fileSize)});
136+
}
137+
138+
private static StoragePartitionLoader storagePartitionLoader(
139+
List<LocatedFileStatus> files,
140+
DirectoryLister directoryLister,
141+
ConnectorSession connectorSession)
142+
{
143+
return new StoragePartitionLoader(
144+
SIMPLE_TABLE,
145+
ImmutableMap.of(),
146+
createBucketSplitInfo(Optional.empty(), Optional.empty()),
147+
connectorSession,
148+
new TestingHdfsEnvironment(files),
149+
new NamenodeStats(),
150+
directoryLister,
151+
new ConcurrentLinkedDeque<>(),
152+
false,
153+
false,
154+
false);
155+
}
156+
}

presto-product-tests/src/main/java/com/facebook/presto/tests/TestGroups.java

Lines changed: 1 addition & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -69,6 +69,7 @@ public final class TestGroups
6969
public static final String HIVE_LIST_CACHING = "hive_list_caching";
7070
public static final String INVALIDATE_METASTORE_CACHE = "invalidate_metastore_cache";
7171
public static final String MIXED_CASE = "mixed_case";
72+
public static final String HIVE_SYMLINK_TABLE = "hive_symlink_table";
7273

7374
private TestGroups() {}
7475
}

0 commit comments

Comments
 (0)