Skip to content
Draft
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension


Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
Original file line number Diff line number Diff line change
Expand Up @@ -22,6 +22,8 @@
import java.util.HashMap;
import java.util.Map;
import java.util.Objects;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;

/**
* A wrapper {@link ByteBufferAllocator} implementation that tracks whether all allocated buffers are released. It
Expand All @@ -37,7 +39,9 @@ public final class TrackingByteBufferAllocator implements ByteBufferAllocator, A
*
* @see ByteBufferAllocationStacktraceException
*/
private static final boolean DEBUG = false;
private static final boolean DEBUG = true;

private static final Logger LOG = LoggerFactory.getLogger(TrackingByteBufferAllocator.class);

public static TrackingByteBufferAllocator wrap(ByteBufferAllocator allocator) {
return new TrackingByteBufferAllocator(allocator);
Expand Down Expand Up @@ -69,6 +73,11 @@ public boolean equals(Object o) {
public int hashCode() {
return hashCode;
}

@Override
public String toString() {
return buffer.toString();
}
}

public static class LeakDetectorHeapByteBufferAllocatorException extends RuntimeException {
Expand Down Expand Up @@ -133,14 +142,22 @@ private TrackingByteBufferAllocator(ByteBufferAllocator allocator) {
@Override
public ByteBuffer allocate(int size) {
ByteBuffer buffer = allocator.allocate(size);
allocated.put(new Key(buffer), ByteBufferAllocationStacktraceException.create());
final ByteBufferAllocationStacktraceException ex = ByteBufferAllocationStacktraceException.create();
final Key key = new Key(buffer);
allocated.put(key, ex);
LOG.debug("Creating ByteBuffer:{} size {} {}", key.hashCode(), size, buffer);
if (DEBUG) {
LOG.debug("Stack", ex);
}
return buffer;
}

@Override
public void release(ByteBuffer b) throws ReleasingUnallocatedByteBufferException {
Objects.requireNonNull(b);
if (allocated.remove(new Key(b)) == null) {
final Key key = new Key(b);
LOG.debug("Releasing ByteBuffer: {}: {}", key.hashCode(), b);
if (allocated.remove(key) == null) {
throw new ReleasingUnallocatedByteBufferException();
}
allocator.release(b);
Expand All @@ -156,6 +173,7 @@ public boolean isDirect() {
@Override
public void close() throws LeakedByteBufferException {
if (!allocated.isEmpty()) {
allocated.keySet().forEach(key -> LOG.warn("Unreleased ByteBuffer {}; {}", key.hashCode(), key));
LeakedByteBufferException ex = new LeakedByteBufferException(
allocated.size(), allocated.values().iterator().next());
allocated.clear(); // Drop the references to the ByteBuffers, so they can be gc'd
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -1308,10 +1308,15 @@ private void readVectored(List<ConsecutivePartList> allParts, ChunkListBuilder b
LOG.debug("Reading {} bytes of data with vectored IO in {} ranges", totalSize, ranges.size());
// Request a vectored read;
f.readVectored(ranges, options.getAllocator());
List<ByteBuffer> buffers = new ArrayList<>(allParts.size());
int k = 0;
for (ConsecutivePartList consecutivePart : allParts) {
ParquetFileRange currRange = ranges.get(k++);
consecutivePart.readFromVectoredRange(currRange, builder);
try {
for (ConsecutivePartList consecutivePart : allParts) {
ParquetFileRange currRange = ranges.get(k++);
buffers.add(consecutivePart.readFromVectoredRange(currRange, builder));
}
} finally {
builder.addBuffersToRelease(buffers);
}
}

Expand Down Expand Up @@ -2241,11 +2246,16 @@ private void setReadMetrics(long startNs, long len) {
/**
* Populate data in a parquet file range from a vectored range; will block for up
* to {@link #HADOOP_VECTORED_READ_TIMEOUT_SECONDS} seconds.
*
* @param currRange range to populated.
* @param builder used to build chunk list to read the pages for the different columns.
*
* @return the buffer, for queuing for release later.
*
* @throws IOException if there is an error while reading from the stream, including a timeout.
*/
public void readFromVectoredRange(ParquetFileRange currRange, ChunkListBuilder builder) throws IOException {
public ByteBuffer readFromVectoredRange(ParquetFileRange currRange, ChunkListBuilder builder)
throws IOException {
ByteBuffer buffer;
final long timeoutSeconds = HADOOP_VECTORED_READ_TIMEOUT_SECONDS;
long readStart = System.nanoTime();
Expand All @@ -2268,6 +2278,7 @@ public void readFromVectoredRange(ParquetFileRange currRange, ChunkListBuilder b
for (ChunkDescriptor descriptor : chunks) {
builder.add(descriptor, stream.sliceBuffers(descriptor.size), f);
}
return buffer;
}

/**
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -28,6 +28,7 @@
import java.io.FileInputStream;
import java.io.IOException;
import java.io.InputStream;
import java.io.UncheckedIOException;
import java.nio.charset.StandardCharsets;
import java.util.ArrayList;
import java.util.Arrays;
Expand All @@ -41,6 +42,7 @@
import java.util.concurrent.TimeUnit;
import org.apache.hadoop.conf.Configuration;
import org.apache.hadoop.fs.FileSystem;
import org.apache.hadoop.fs.LocalFileSystem;
import org.apache.hadoop.fs.Path;
import org.apache.parquet.bytes.DirectByteBufferAllocator;
import org.apache.parquet.bytes.HeapByteBufferAllocator;
Expand Down Expand Up @@ -529,6 +531,14 @@ private void writeEncryptedParquetFile(
} catch (Exception e) {
addErrorToErrorCollectorAndLog("Failed writing " + file.toString(), e, encryptionConfiguration, null);
}
// remove the CRC file so that Hadoop local filesystem doesn't slice buffers on
// vector reads.
try {
final LocalFileSystem local = FileSystem.getLocal(new Configuration());
local.delete(local.getChecksumFile(file), false);
} catch (IOException e) {
throw new UncheckedIOException(e);
}
}

private Path getFileName(Path root, EncryptionConfiguration encryptionConfiguration, int threadNumber) {
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -130,6 +130,8 @@ public static List<User> makeUsers() {
public static void setup() throws IOException {
users = makeUsers();
phonebookFile = PhoneBookWriter.writeToFile(users);
// remove the CRC file
new File(phonebookFile.getParentFile(), "." + phonebookFile.getName() + ".crc").delete();
}

private static interface UserFilter {
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -64,6 +64,8 @@
import java.util.stream.Collectors;
import java.util.stream.Stream;
import org.apache.hadoop.conf.Configuration;
import org.apache.hadoop.fs.FileSystem;
import org.apache.hadoop.fs.LocalFileSystem;
import org.apache.hadoop.fs.Path;
import org.apache.parquet.bytes.HeapByteBufferAllocator;
import org.apache.parquet.bytes.TrackingByteBufferAllocator;
Expand Down Expand Up @@ -363,6 +365,10 @@ private static void writePhoneBookToFile(
.withWriterVersion(parquetVersion),
DATA);
}
// remove the CRC file so that Hadoop local filesystem doesn't slice buffers on
// vector reads.
final LocalFileSystem local = FileSystem.getLocal(new Configuration());
local.delete(local.getChecksumFile(file), false);
}

private static FileEncryptionProperties getFileEncryptionProperties() {
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -21,6 +21,7 @@
import static org.apache.parquet.filter2.predicate.FilterApi.in;
import static org.apache.parquet.filter2.predicate.FilterApi.longColumn;
import static org.apache.parquet.hadoop.ParquetFileWriter.Mode.OVERWRITE;
import static org.apache.parquet.hadoop.ParquetInputFormat.HADOOP_VECTORED_IO_ENABLED;
import static org.junit.Assert.assertEquals;

import java.io.IOException;
Expand All @@ -34,6 +35,8 @@
import java.util.List;
import java.util.Set;
import org.apache.hadoop.conf.Configuration;
import org.apache.hadoop.fs.FileSystem;
import org.apache.hadoop.fs.LocalFileSystem;
import org.apache.hadoop.fs.Path;
import org.apache.parquet.bytes.HeapByteBufferAllocator;
import org.apache.parquet.bytes.TrackingByteBufferAllocator;
Expand All @@ -50,6 +53,8 @@
import org.junit.Test;
import org.junit.runner.RunWith;
import org.junit.runners.Parameterized;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;

@RunWith(Parameterized.class)
public class TestParquetReader {
Expand All @@ -59,8 +64,10 @@ public class TestParquetReader {
private static final Path STATIC_FILE_WITHOUT_COL_INDEXES =
createPathFromCP("/test-file-with-no-column-indexes-1.parquet");
private static final List<PhoneBookWriter.User> DATA = Collections.unmodifiableList(makeUsers(1000));
private static final Logger LOG = LoggerFactory.getLogger(TestParquetReader.class);

private final Path file;
private final boolean vectoredRead;
private final long fileSize;
private TrackingByteBufferAllocator allocator;

Expand All @@ -72,15 +79,19 @@ private static Path createPathFromCP(String path) {
}
}

public TestParquetReader(Path file) throws IOException {
public TestParquetReader(Path file, final boolean vectoredRead) throws IOException {
this.file = file;
this.vectoredRead = vectoredRead;
this.fileSize =
file.getFileSystem(new Configuration()).getFileStatus(file).getLen();
}

@Parameterized.Parameters
@Parameterized.Parameters(name = "file={0} vector={1}")
public static Collection<Object[]> data() {
Object[][] data = new Object[][] {{FILE_V1}, {FILE_V2}, {STATIC_FILE_WITHOUT_COL_INDEXES}};
Object[][] data = new Object[][] {
{FILE_V1, false}, {FILE_V2, false}, {STATIC_FILE_WITHOUT_COL_INDEXES, false},
{FILE_V1, true}, {FILE_V2, true}, {STATIC_FILE_WITHOUT_COL_INDEXES, true}
};
return Arrays.asList(data);
}

Expand All @@ -96,6 +107,11 @@ public static void deleteFiles() throws IOException {
deleteFile(FILE_V2);
}

@Before
public void setup() throws IOException {
LOG.info("Test run with file {}, size {}; vectored={}", file, fileSize, vectoredRead);
}

private static void deleteFile(Path file) throws IOException {
file.getFileSystem(new Configuration()).delete(file, false);
}
Expand Down Expand Up @@ -139,6 +155,10 @@ private static void writePhoneBookToFile(Path file, ParquetProperties.WriterVers
.withPageSize(pageSize)
.withWriterVersion(parquetVersion),
DATA);
// remove the CRC file so that Hadoop local filesystem doesn't slice buffers on
// vector reads.
final LocalFileSystem local = FileSystem.getLocal(new Configuration());
local.delete(local.getChecksumFile(file), false);
}

private List<PhoneBookWriter.User> readUsers(
Expand All @@ -153,8 +173,11 @@ private List<PhoneBookWriter.User> readUsers(
long rangeStart,
long rangeEnd)
throws IOException {
final Configuration conf = new Configuration();
conf.setBoolean(HADOOP_VECTORED_IO_ENABLED, vectoredRead);
return PhoneBookWriter.readUsers(
ParquetReader.builder(new GroupReadSupport(), file)
.withConf(conf)
.withAllocator(allocator)
.withFilter(filter)
.useDictionaryFilter(useOtherFiltering)
Expand All @@ -179,22 +202,22 @@ public void closeAllocator() {
public void testCurrentRowIndex() throws Exception {
ParquetReader<Group> reader = PhoneBookWriter.createReader(file, FilterCompat.NOOP, allocator);
// Fetch row index without processing any row.
assertEquals(reader.getCurrentRowIndex(), -1);
assertEquals(-1, reader.getCurrentRowIndex());
reader.read();
assertEquals(reader.getCurrentRowIndex(), 0);
assertEquals(0, reader.getCurrentRowIndex());
// calling the same API again and again should return same result.
assertEquals(reader.getCurrentRowIndex(), 0);
assertEquals(0, reader.getCurrentRowIndex());

reader.read();
assertEquals(reader.getCurrentRowIndex(), 1);
assertEquals(reader.getCurrentRowIndex(), 1);
assertEquals(1, reader.getCurrentRowIndex());
assertEquals(1, reader.getCurrentRowIndex());
long expectedCurrentRowIndex = 2L;
while (reader.read() != null) {
assertEquals(reader.getCurrentRowIndex(), expectedCurrentRowIndex);
expectedCurrentRowIndex++;
}
// reader.read() returned null and so reader doesn't have any more rows.
assertEquals(reader.getCurrentRowIndex(), -1);
assertEquals(-1, reader.getCurrentRowIndex());
}

@Test
Expand All @@ -214,13 +237,13 @@ public void testSimpleFiltering() throws Exception {
// The readUsers also validates the rowIndex for each returned row.
List<PhoneBookWriter.User> filteredUsers1 =
readUsers(FilterCompat.get(in(longColumn("id"), idSet)), true, true);
assertEquals(filteredUsers1.size(), 2L);
assertEquals(2L, filteredUsers1.size());
List<PhoneBookWriter.User> filteredUsers2 =
readUsers(FilterCompat.get(in(longColumn("id"), idSet)), true, false);
assertEquals(filteredUsers2.size(), 2L);
assertEquals(2L, filteredUsers2.size());
List<PhoneBookWriter.User> filteredUsers3 =
readUsers(FilterCompat.get(in(longColumn("id"), idSet)), false, false);
assertEquals(filteredUsers3.size(), 1000L);
assertEquals(1000L, filteredUsers3.size());
}

@Test
Expand Down
2 changes: 1 addition & 1 deletion pom.xml
Original file line number Diff line number Diff line change
Expand Up @@ -84,7 +84,7 @@
<spotless.version>2.30.0</spotless.version>
<shade.prefix>shaded.parquet</shade.prefix>
<!-- Guarantees no newer classes/methods/constants are used by parquet. -->
<hadoop.version>3.3.0</hadoop.version>
<hadoop.version>3.4.1</hadoop.version>
<parquet.format.version>2.11.0</parquet.format.version>
<previous.version>1.15.1</previous.version>
<thrift.executable>thrift</thrift.executable>
Expand Down
Loading