Skip to content

Conversation

@RussellSpitzer
Copy link
Member

While I was working on V4 Parquet Manifests I found a bunch of test classes that do not properly parameterize formatVersion and don't run on V4. I've fixed all the ones I could find.

List<Object> parameters = Lists.newArrayList();
for (Boolean isStreamingMode : new Boolean[] {true, false}) {
for (int formatVersion : new int[] {1, 2}) {
for (int formatVersion : TestHelpers.ALL_VERSIONS) {
Copy link
Member Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Someone who is a flink expert maybe @stevenzwu or @pvary let me know if it was intentional that this test not check all versions or not

Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Here it is not intentional.

new FileFormat[] {FileFormat.AVRO, FileFormat.ORC, FileFormat.PARQUET}) {
for (Object[] catalogParams : CatalogTestBase.parameters()) {
for (int version : Arrays.asList(2, 3)) {
for (int version : TestHelpers.V2_AND_ABOVE) {
Copy link
Member Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Same here

Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Here we have tests like testRewriteNoConflictWithEqualityDeletes which will not work with V1.
I think @nastra intentionally only added V2 and V3 - maybe adding V4 would be nice.

List<Object> parameters = Lists.newArrayList();
for (Boolean isStreamingMode : new Boolean[] {true, false}) {
for (int formatVersion : new int[] {1, 2}) {
for (int formatVersion : TestHelpers.ALL_VERSIONS) {
Copy link
Member Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

and here

new Object[] {FileFormat.PARQUET, 3, false, PlanningMode.DISTRIBUTED},
new Object[] {FileFormat.PARQUET, 3, true, PlanningMode.LOCAL},
};
List<Object[]> parameters = Lists.newArrayList();
Copy link
Member Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

We had a really weird pattern of what we tested here before, I mimicked it below, only testing V4 with parquet

Copy link
Contributor

@stevenzwu stevenzwu left a comment

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

this looks good to me after CI failure is fixed. Thanks for doing this. With these examples, hopefully new tests will be able to follow the same pattern.

It is hard to be 100% accurate because they are just plain numbers (not enum).

@RussellSpitzer
Copy link
Member Author

this looks good to me after CI failure is fixed. Thanks for doing this. With these examples, hopefully new tests will be able to follow the same pattern.

It is hard to be 100% accurate because they are just plain numbers (not enum).

Yep, let me keep fixing cli issues, there are so many configs so I've been leaning on the automated cli to find any remaining bugs.

parameters.add(new Object[] {FileFormat.ORC, false, version});
parameters.add(new Object[] {FileFormat.ORC, true, version});
}
return parameters.toArray(new Object[0][]);
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Why are we only updating v3.4? What about v4.0 and v3.5?

Copy link
Member Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Ah I missed that, Sorry I've been going back and forth between build configs!

@RussellSpitzer
Copy link
Member Author

I couldn't figure out exactly what the memory leak is in our test suite that's causing an Issue but it seems like it's related to the task statuses never getting cleared from the Spark context during the TestRewriteDataFilesAction test suite. Because the suite now runs within an additional config, the number of tasks increased dramatically and I believe this was the base cause of the OOM.

I tried disabling the UI but that didn't seem to help in any way, the statuses still stuck around.

So I decided to take a different tack and just optimize the test suite instead. The main thing I did is to go through and take all of the "Spark Sorts" and switch them to normal Java collection sorts. This has two outcomes, first the test suite runs much faster since we had adaptive shuffle disabled for this suite and it had to do 200 tasks per sort and because local sort is much faster than using the Spark mechanism. Second, the number of tasks is reduced dramatically which decreases the amount of "Task Status" objects that hang around.

If this ends up still being an issue in the future we can either track down the status issue or move some these tests into a different test suite.

@RussellSpitzer
Copy link
Member Author

Ok apparently that wasn't enough, fixed it locally but on gradle i'm still getting an Exited with Return Code 52 error from the workflow

@RussellSpitzer
Copy link
Member Author

Ok the issues is with Direct memory ... So Now I have to track down memory usage in our Arrow Readers

@RussellSpitzer
Copy link
Member Author

Ok So Compaction is really leaking direct memory (at least in our tests)

image

Spark 3.4 - Testing

TestRewriteDataFilesActionStarts at 5:06
-- testBinPackCombineMixedFiles. 5:11

TestRewritePositionDeleteFilesAction 5:15
OOM Direct Memory

@RussellSpitzer
Copy link
Member Author

RussellSpitzer commented Aug 25, 2025

My current understanding is that at least within our compaction code (and rewrite delete files action code) we are leaking memory. The biggest contributor is TestCombineMixedFiles (probably because of the amount of data?). When I added additional test versions the leak just got worse.

When I removed the Spark Shuffles I reduced the leakage somehow, but that just pushed out the OOM since the memory is never released

My current targets are -
Memory leak during spark shuffle?
Memory Leak during our vectorized read code?

My guesses are these are culprits because we see this happen a ton during compaction but not really anywhere else in the test suite.

Tomorrow i'm going to disable our vectorized read path, if that fixes the issue then I'll have narrowed it down a bit more

@RussellSpitzer
Copy link
Member Author

With vectorized reads disabled (Still a tiny leak but if you look at the scale, you'll see it's in the range of 0 to 13MB)

image

So we definitely have some issue with the vectorized read path

@RussellSpitzer
Copy link
Member Author

Test For Memory Leak, still working on nailing down where this is happening but it's unrelated to the test parameterization.

diff --git a/spark/v3.4/spark/src/test/java/org/apache/iceberg/spark/source/TestParquetVectorizedScan.java b/spark/v3.4/spark/src/test/java/org/apache/iceberg/spark/source/TestParquetVectorizedScan.java
index a6b5166b3..316ef762e 100644
--- a/spark/v3.4/spark/src/test/java/org/apache/iceberg/spark/source/TestParquetVectorizedScan.java
+++ b/spark/v3.4/spark/src/test/java/org/apache/iceberg/spark/source/TestParquetVectorizedScan.java
@@ -18,9 +18,163 @@
  */
 package org.apache.iceberg.spark.source;
 
+import static org.apache.iceberg.Files.localOutput;
+import static org.assertj.core.api.Assertions.assertThat;
+
+import java.io.File;
+import java.io.IOException;
+import java.lang.management.BufferPoolMXBean;
+import java.lang.management.ManagementFactory;
+import java.nio.file.Path;
+import java.util.List;
+import java.util.UUID;
+import org.apache.avro.generic.GenericData;
+import org.apache.hadoop.conf.Configuration;
+import org.apache.iceberg.DataFile;
+import org.apache.iceberg.DataFiles;
+import org.apache.iceberg.FileFormat;
+import org.apache.iceberg.PartitionSpec;
+import org.apache.iceberg.Table;
+import org.apache.iceberg.hadoop.HadoopTables;
+import org.apache.iceberg.io.FileAppender;
+import org.apache.iceberg.parquet.Parquet;
+import org.apache.iceberg.spark.data.RandomData;
+import org.apache.iceberg.types.Types;
+import org.apache.spark.sql.Dataset;
+import org.apache.spark.sql.Row;
+import org.junit.jupiter.api.Test;
+import org.junit.jupiter.api.io.TempDir;
+
 public class TestParquetVectorizedScan extends TestParquetScan {
+  
+  private static final Configuration CONF = new Configuration();
+  
+  @TempDir private Path temp;
+
   @Override
   protected boolean vectorized() {
     return true;
   }
+
+  /**
+   * Test to verify that direct memory used during vectorized parquet reading is properly released.
+   * This creates a large (128MB) parquet file, reads it using the vectorized reader, collects all
+   * results, and verifies that direct memory is released after the read operation completes.
+   */
+  @Test
+  public void testDirectMemoryReleaseAfterLargeVectorizedRead() throws IOException {
+    // Create a schema with enough columns to generate significant data
+    org.apache.iceberg.Schema schema =
+        new org.apache.iceberg.Schema(
+            Types.NestedField.required(1, "id", Types.LongType.get()),
+            Types.NestedField.required(2, "data1", Types.StringType.get()),
+            Types.NestedField.required(3, "data2", Types.StringType.get()),
+            Types.NestedField.required(4, "data3", Types.StringType.get()),
+            Types.NestedField.required(5, "data4", Types.StringType.get()),
+            Types.NestedField.required(6, "data5", Types.StringType.get()),
+            Types.NestedField.required(7, "number1", Types.DoubleType.get()),
+            Types.NestedField.required(8, "number2", Types.DoubleType.get()));
+
+    File location = temp.resolve("memory_leak_test").toFile();
+
+    HadoopTables tables = new HadoopTables(CONF);
+    Table table = tables.create(schema, PartitionSpec.unpartitioned(), location.toString());
+    configureTable(table);
+
+    List<GenericData.Record> records = RandomData.generateList(schema, 1000000, 42L);
+
+    // Write the large parquet file
+    File dataFolder = new File(table.location(), "data");
+    File parquetFile = new File(dataFolder, FileFormat.PARQUET.addExtension(UUID.randomUUID().toString()));
+
+    try (FileAppender<GenericData.Record> writer =
+        Parquet.write(localOutput(parquetFile)).schema(schema).build()) {
+      writer.addAll(records);
+    }
+
+    // Verify the file is actually large enough (~128MB)
+    long fileSizeBytes = parquetFile.length();
+    assertThat(fileSizeBytes)
+        .as("Generated file should be at least 50MB")
+        .isGreaterThan(50L * 1024 * 1024);
+
+    DataFile file =
+        DataFiles.builder(PartitionSpec.unpartitioned())
+            .withFileSizeInBytes(fileSizeBytes)
+            .withPath(parquetFile.toString())
+            .withRecordCount(records.size())
+            .build();
+
+    table.newAppend().appendFile(file).commit();
+
+    // Get direct memory usage before reading
+    long directMemoryBefore = getDirectMemoryUsed();
+
+    // Read the file using vectorized parquet reader and collect all results
+    Dataset<Row> df = spark.read().format("iceberg").load(table.location());
+    List<Row> rows = df.collectAsList();
+
+    // Get direct memory usage after reading but before cleanup
+    long directMemoryAfterRead = getDirectMemoryUsed();
+
+    // Verify we read the expected number of rows
+    assertThat(rows).as("Should contain all records").hasSize(records.size());
+
+    // Clear the collected data to release references
+    rows = null;
+    df = null;
+
+    // Force garbage collection to ensure any memory that should be released is released
+    System.gc();
+    System.gc();
+
+    // Wait a bit for GC to complete
+    try {
+      Thread.sleep(1000);
+    } catch (InterruptedException e) {
+      Thread.currentThread().interrupt();
+    }
+
+    // Get direct memory usage after cleanup
+    long directMemoryAfterCleanup = getDirectMemoryUsed();
+
+    // Calculate memory increases
+    long memoryIncreaseFromRead = directMemoryAfterRead - directMemoryBefore;
+    long memoryLeakAfterCleanup = directMemoryAfterCleanup - directMemoryBefore;
+
+    // Log memory usage for debugging
+    System.out.printf(
+        "Direct memory usage - Before: %d bytes, After read: %d bytes, After cleanup: %d bytes%n",
+        directMemoryBefore, directMemoryAfterRead, directMemoryAfterCleanup);
+    System.out.printf(
+        "Memory increase from read: %d bytes, Potential leak: %d bytes%n",
+        memoryIncreaseFromRead, memoryLeakAfterCleanup);
+
+    // We expect some memory to be used during reading (this verifies the test is actually testing something meaningful)
+    assertThat(memoryIncreaseFromRead)
+        .as("Reading a large file should use some direct memory")
+        .isGreaterThan(0);
+
+    // The key assertion: after cleanup, direct memory usage should return close to the initial level
+    // We allow for some small variance (1MB) due to JVM internals and other concurrent operations
+    long allowableMemoryVariance = 1024 * 1024; // 1MB
+    assertThat(memoryLeakAfterCleanup)
+        .as("Direct memory should be released after reading (potential memory leak detected)")
+        .isLessThanOrEqualTo(allowableMemoryVariance);
+  }
+
+  /**
+   * Gets the current direct memory usage by summing up all direct buffer pools.
+   */
+  private long getDirectMemoryUsed() {
+    List<BufferPoolMXBean> bufferPoolMXBeans =
+        ManagementFactory.getPlatformMXBeans(BufferPoolMXBean.class);
+    long directMemory = 0;
+    for (BufferPoolMXBean bufferPoolMXBean : bufferPoolMXBeans) {
+      if (bufferPoolMXBean.getName().equals("direct")) {
+        directMemory += bufferPoolMXBean.getMemoryUsed();
+      }
+    }
+    return directMemory;
+  }
 }

@RussellSpitzer
Copy link
Member Author

RussellSpitzer commented Aug 26, 2025

 @TestTemplate
  public void testReadRepeated() {
    Table table = createTable(1); // 400000
    shouldHaveFiles(table, 1);

    // Add one more small file, and one large file
    writeRecords(1, SCALE * 3);
    int i = 0;

    while (i < 100) {
      System.out.println(currentData().size());
      System.out.println("Arrow Allocations : " + ArrowAllocation.rootAllocator().getAllocatedMemory());
      i++;
    }
  }

Less complicated repo, this test will OOM after 40 iteratiors (default offheap is 4GB~)

@RussellSpitzer
Copy link
Member Author

RussellSpitzer commented Aug 26, 2025

Ok doing some more investigating, and we are always allocating 1 more VectorizedReaderBuilder than we use. IE if you have 1 Spark task we make 2 readers. If you have 110 tasks we make 111 readers. The final reader is never closed ... Trying to track down who is making it now

I was wrong, it's more like the last column in the last reader of the last task allocates more memory than it frees

@RussellSpitzer
Copy link
Member Author

RussellSpitzer commented Aug 26, 2025

I figured it out:

In VectorizedArrowReader

  private void allocateFieldVector(boolean dictionaryEncodedVector) {
    if (dictionaryEncodedVector) {
      allocateDictEncodedVector();
    } else {
      Field arrowField = ArrowSchemaUtil.convert(getPhysicalType(columnDescriptor, icebergField));
      if (columnDescriptor.getPrimitiveType().getOriginalType() != null) {
        allocateVectorBasedOnOriginalType(columnDescriptor.getPrimitiveType(), arrowField);
      } else {
        allocateVectorBasedOnTypeName(columnDescriptor.getPrimitiveType(), arrowField);
      }
    }
  }

Makes the assumption that all pages will have the same encoding. This is a big problem if the first page is dictionary encoded and the following ones are not. The first pass by this function will call

allocateDictEncodedVector()

Which does this

    this.vec = field.createVector(rootAlloc);
    ((IntVector) vec).allocateNew(batchSize);

But what happens if we then read a non-dictionary encoded page? We will then go down the other path, AllocateVectorBasedOnOriginalType, And hit this

     switch (primitive.getOriginalType()) {
      case ENUM:
      case JSON:
      case UTF8:
      case BSON:
        this.vec = arrowField.createVector(rootAlloc);
        // TODO: Possibly use the uncompressed page size info to set the initial capacity
        vec.setInitialCapacity(batchSize * AVERAGE_VARIABLE_WIDTH_RECORD_SIZE);
        vec.allocateNewSafe();
        this.readType = ReadType.VARCHAR;
        this.typeWidth = UNKNOWN_WIDTH;
        break;

Which will create a new vector for this.vec causing us to lose our first vector.

This is easy enough to fix, in both of these functions we just need to clear out "this.vec" if it is set

So in the case above we have dictionary encoded pages which allocate IntVectors which are then replaced with BaseVarWidthVectors for non encoded pages. This means we drop the previous vector and it's allocation with each swap. The more pages the worse it is.

@pvary
Copy link
Contributor

pvary commented Aug 27, 2025

CC: @nandorKollar

@nandorKollar
Copy link
Contributor

So in the case above we have dictionary encoded pages which allocate IntVectors which are then replaced with BaseVarWidthVectors for non encoded pages. This means we drop the previous vector and it's allocation with each swap. The more pages the worse it is.

Why did they type of the vector change from IntVectors to BaseVarWidthVectors? If we clear out "this.vec" if it is set, wouldn't this type change in the vector cause problems? Shouldn't we explicitly close the this.vec if it is not null, before setting it to a new vector?

@RussellSpitzer
Copy link
Member Author

Why did they type of the vector change from IntVectors to BaseVarWidthVectors?

The vector changes because Dictionary encoded pages are a sequence of ints, {1, 2, 3, 4} that refer to entries in the Dictionary which maps the int to the actual column value. {1: "foo", 2: "bar", ....}. Other pages have literal representations of the values stored as binary {foo, bar, bazz }. So you have to switch vector types when you alternate.

If we clear out "this.vec" if it is set, wouldn't this type change in the vector cause problems? Shouldn't we explicitly close the this.vec if it is not null, before setting it to a new vector?

No. To be clear, the code has always cleared out this.vec and we dont' have correctness issues because essentially what is happening is:

  1. Reader looks to see if it can read the page
  2. If it can't re-use the container do an allocate for the correct container

What is missing here is
2.a If I previously had a container but it cannot be re-used, clear it

@nandorKollar
Copy link
Contributor

Why did they type of the vector change from IntVectors to BaseVarWidthVectors?

The vector changes because Dictionary encoded pages are a sequence of ints, {1, 2, 3, 4} that refer to entries in the Dictionary which maps the int to the actual column value. {1: "foo", 2: "bar", ....}. Other pages have literal representations of the values stored as binary {foo, bar, bazz }. So you have to switch vector types when you alternate.

If we clear out "this.vec" if it is set, wouldn't this type change in the vector cause problems? Shouldn't we explicitly close the this.vec if it is not null, before setting it to a new vector?

No. To be clear, the code has always cleared out this.vec and we dont' have correctness issues because essentially what is happening is:

  1. Reader looks to see if it can read the page
  2. If it can't re-use the container do an allocate for the correct container

What is missing here is 2.a If I previously had a container but it cannot be re-used, clear it

Thanks for clarifying why the type change happens, makes sense. We can't reuse the vector, only when there's a switch from/to dictionary encoded pages, right? When you mention, that it is always cleared, you mean the the value count is set to 0 in this block:

    if (reuse == null
        || (!dictEncoded && readType == ReadType.DICTIONARY)
        || (dictEncoded && readType != ReadType.DICTIONARY)) {
      allocateFieldVector(dictEncoded);
      nullabilityHolder = new NullabilityHolder(batchSize);
    } else {
      vec.setValueCount(0);
      nullabilityHolder.reset();
    }

@RussellSpitzer
Copy link
Member Author

Previously it would just completely drop the reference to the previous vec so it didn't matter. (That's the leak). I'll raise a new PR with a fix. Also fun fact, we already have a test THAT WOULD FAIL if we were actually tracking memory usage. See TestParquetDictionaryEncodedVectorizedReads.testMixedDictionaryNonDictionaryReads. One sec and i'll have a new PR up to show this

@RussellSpitzer
Copy link
Member Author

Rebased now that the leak is gone, removed test speedups also being done in a separate PR #13947

Copy link
Contributor

@amogh-jahagirdar amogh-jahagirdar left a comment

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Thank you @RussellSpitzer

@RussellSpitzer RussellSpitzer merged commit 8bcfa61 into apache:main Sep 2, 2025
42 checks passed
@RussellSpitzer
Copy link
Member Author

Finally Merged! Thanks everyone who helped with this and the associated memory leak issue.

Thanks to everyone:
@pvary
@amogh-jahagirdar
@ebyhr
@stevenzwu
@nastra

Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment

Projects

None yet

Development

Successfully merging this pull request may close these issues.

7 participants