From ed3e249f0e30e6d259d597c96fb0c9ca22e2b4f0 Mon Sep 17 00:00:00 2001 From: Ostrzyciel Date: Mon, 22 Sep 2025 18:48:53 +0200 Subject: [PATCH 1/4] GH-5291 Add asynchronous fsync to LuceneSail --- .../eclipse/rdf4j/sail/lucene/LuceneSail.java | 15 + .../impl/DelayedSyncDirectoryWrapper.java | 131 +++++++++ .../rdf4j/sail/lucene/impl/LuceneIndex.java | 32 ++- .../impl/DelayedSyncDirectoryWrapperTest.java | 267 ++++++++++++++++++ .../lucene/impl/LuceneDelayedFsyncTest.java | 58 ++++ 5 files changed, 498 insertions(+), 5 deletions(-) create mode 100644 core/sail/lucene/src/main/java/org/eclipse/rdf4j/sail/lucene/impl/DelayedSyncDirectoryWrapper.java create mode 100644 core/sail/lucene/src/test/java/org/eclipse/rdf4j/sail/lucene/impl/DelayedSyncDirectoryWrapperTest.java create mode 100644 core/sail/lucene/src/test/java/org/eclipse/rdf4j/sail/lucene/impl/LuceneDelayedFsyncTest.java diff --git a/core/sail/lucene-api/src/main/java/org/eclipse/rdf4j/sail/lucene/LuceneSail.java b/core/sail/lucene-api/src/main/java/org/eclipse/rdf4j/sail/lucene/LuceneSail.java index f2d96ecc435..31d140b3889 100644 --- a/core/sail/lucene-api/src/main/java/org/eclipse/rdf4j/sail/lucene/LuceneSail.java +++ b/core/sail/lucene-api/src/main/java/org/eclipse/rdf4j/sail/lucene/LuceneSail.java @@ -289,6 +289,21 @@ public class LuceneSail extends NotifyingSailWrapper { */ public static final String LUCENE_RAMDIR_KEY = "useramdir"; + /** + * Set the key "fsyncInterval=<t>" as sail parameter to configure the interval in milliseconds in which fsync + * is called on the Lucene index. Set to 0 or a negative value to call fsync synchronously after each operation. + * Default is 0. Setting this parameter to a positive value will improve performance for frequent writes, but may + * cause the loss of the last few operations in case of a crash. + */ + public static final String FSYNC_INTERVAL_KEY = "fsyncInterval"; + + /** + * Set the key "fsyncMaxPendingFiles=<n>" as sail parameter to configure the maximum number of files pending + * to be fsynced. When this number is reached, a fsync is forced to limit memory usage. Default is 5000. This + * parameter only has an effect when {@link #FSYNC_INTERVAL_KEY} is set to a positive value. + */ + public static final String FSYNC_MAX_PENDING_FILES_KEY = "fsyncMaxPendingFiles"; + /** * Set the key "defaultNumDocs=<n>" as sail parameter to limit the maximum number of documents to return from * a search query. The default is to return all documents. NB: this may involve extra cost for some SearchIndex diff --git a/core/sail/lucene/src/main/java/org/eclipse/rdf4j/sail/lucene/impl/DelayedSyncDirectoryWrapper.java b/core/sail/lucene/src/main/java/org/eclipse/rdf4j/sail/lucene/impl/DelayedSyncDirectoryWrapper.java new file mode 100644 index 00000000000..fdfc5eb4fb2 --- /dev/null +++ b/core/sail/lucene/src/main/java/org/eclipse/rdf4j/sail/lucene/impl/DelayedSyncDirectoryWrapper.java @@ -0,0 +1,131 @@ +/******************************************************************************* + * Copyright (c) 2025 Eclipse RDF4J contributors. + * + * All rights reserved. This program and the accompanying materials + * are made available under the terms of the Eclipse Distribution License v1.0 + * which accompanies this distribution, and is available at + * http://www.eclipse.org/org/documents/edl-v10.php. + * + * SPDX-License-Identifier: BSD-3-Clause + *******************************************************************************/ +package org.eclipse.rdf4j.sail.lucene.impl; + +import java.io.IOException; +import java.util.ArrayList; +import java.util.Collection; +import java.util.HashSet; +import java.util.List; +import java.util.concurrent.Executors; +import java.util.concurrent.ScheduledExecutorService; +import java.util.concurrent.TimeUnit; +import java.util.concurrent.atomic.AtomicBoolean; +import java.util.concurrent.atomic.AtomicReference; + +import org.apache.lucene.store.Directory; +import org.apache.lucene.store.FilterDirectory; +import org.slf4j.Logger; +import org.slf4j.LoggerFactory; + +/** + * Wrapper around a Lucene Directory that batches sync and metadata sync calls to be executed at a fixed interval. + * + * @author Piotr Sowiński + */ +class DelayedSyncDirectoryWrapper extends FilterDirectory { + + final private Logger logger = LoggerFactory.getLogger(getClass()); + + final private ScheduledExecutorService scheduler; + + final private AtomicBoolean needsMetadataSync = new AtomicBoolean(false); + + final private AtomicReference lastSyncException = new AtomicReference<>(null); + + final private HashSet pendingSyncs = new HashSet<>(); + + final private int maxPendingSyncs; + + /** + * Creates a new instance of LuceneDirectoryWrapper. + * + * @param in the underlying directory + * @param fsyncInterval the interval in milliseconds writes after which a fsync is performed + * @param maxPendingSyncs the maximum number of pending syncs to accumulate before forcing a sync + */ + DelayedSyncDirectoryWrapper(Directory in, long fsyncInterval, int maxPendingSyncs) { + super(in); + assert fsyncInterval > 0; + assert maxPendingSyncs > 0; + this.maxPendingSyncs = maxPendingSyncs; + scheduler = Executors.newScheduledThreadPool(1); + scheduler.scheduleAtFixedRate( + this::doSync, + fsyncInterval, + fsyncInterval, + TimeUnit.MILLISECONDS + ); + } + + private void doSync() { + List toSync; + synchronized (pendingSyncs) { + toSync = new ArrayList<>(pendingSyncs); + pendingSyncs.clear(); + } + if (!toSync.isEmpty()) { + try { + super.sync(toSync); + } catch (IOException e) { + lastSyncException.set(e); + logger.error("IO error during a periodic sync of Lucene index files", e); + } + } + if (this.needsMetadataSync.getAndSet(false)) { + try { + super.syncMetaData(); + } catch (IOException e) { + lastSyncException.set(e); + logger.error("IO error during a periodic sync of Lucene index metadata", e); + } + } + } + + @Override + public void sync(Collection names) throws IOException { + final IOException ex = lastSyncException.getAndSet(null); + if (ex != null) { + // Rethrow the last exception if there was one. + // This will fail the current transaction, and not the one that caused the original exception. + // But there is no other way to notify the caller of the error, as the sync is done asynchronously. + throw ex; + } + synchronized (pendingSyncs) { + pendingSyncs.addAll(names); + if (pendingSyncs.size() >= maxPendingSyncs) { + // If we have accumulated too many pending syncs, do a sync right away + // to avoid excessive memory usage + doSync(); + } + } + } + + @Override + public void syncMetaData() throws IOException { + needsMetadataSync.set(true); + } + + @Override + public void close() throws IOException { + // Finish the current sync task, if in progress and then shut down + try { + scheduler.shutdown(); + } finally { + // Do a final sync of any remaining files + try { + doSync(); + } finally { + super.close(); + } + } + } +} diff --git a/core/sail/lucene/src/main/java/org/eclipse/rdf4j/sail/lucene/impl/LuceneIndex.java b/core/sail/lucene/src/main/java/org/eclipse/rdf4j/sail/lucene/impl/LuceneIndex.java index 5999a91cbe8..cb59fb0597c 100644 --- a/core/sail/lucene/src/main/java/org/eclipse/rdf4j/sail/lucene/impl/LuceneIndex.java +++ b/core/sail/lucene/src/main/java/org/eclipse/rdf4j/sail/lucene/impl/LuceneIndex.java @@ -25,7 +25,6 @@ import java.util.Iterator; import java.util.List; import java.util.Map; -import java.util.Objects; import java.util.Properties; import java.util.Set; import java.util.concurrent.atomic.AtomicBoolean; @@ -227,6 +226,23 @@ protected Directory createDirectory(Properties parameters) throws IOException { throw new IOException("No luceneIndex set, and no '" + LuceneSail.LUCENE_DIR_KEY + "' or '" + LuceneSail.LUCENE_RAMDIR_KEY + "' parameter given. "); } + long fsyncInterval = 0; + int maxPendingSyncs = 5000; + try { + var param = parameters.getProperty(LuceneSail.FSYNC_INTERVAL_KEY, "0"); + fsyncInterval = Long.parseLong(param); + } catch (NumberFormatException e) { + logger.warn("Ignoring invalid {} parameter: {}", LuceneSail.FSYNC_INTERVAL_KEY, e.getMessage()); + } + try { + var param = parameters.getProperty(LuceneSail.FSYNC_MAX_PENDING_FILES_KEY, "5000"); + maxPendingSyncs = Integer.parseInt(param); + } catch (NumberFormatException e) { + logger.warn("Ignoring invalid {} parameter: {}", LuceneSail.FSYNC_MAX_PENDING_FILES_KEY, e.getMessage()); + } + if (fsyncInterval > 0) { + dir = new DelayedSyncDirectoryWrapper(dir, fsyncInterval, maxPendingSyncs); + } return dir; } @@ -385,10 +401,16 @@ public void shutDown() throws IOException { } } finally { try { - IndexWriter toCloseIndexWriter = indexWriter; - indexWriter = null; - if (toCloseIndexWriter != null) { - toCloseIndexWriter.close(); + try { + IndexWriter toCloseIndexWriter = indexWriter; + indexWriter = null; + if (toCloseIndexWriter != null) { + toCloseIndexWriter.close(); + } + } finally { + // Close the directory -- if asynchronous fsync is used, this will clean + // up the scheduler thread too. + directory.close(); } } finally { if (!exceptions.isEmpty()) { diff --git a/core/sail/lucene/src/test/java/org/eclipse/rdf4j/sail/lucene/impl/DelayedSyncDirectoryWrapperTest.java b/core/sail/lucene/src/test/java/org/eclipse/rdf4j/sail/lucene/impl/DelayedSyncDirectoryWrapperTest.java new file mode 100644 index 00000000000..67846975aa1 --- /dev/null +++ b/core/sail/lucene/src/test/java/org/eclipse/rdf4j/sail/lucene/impl/DelayedSyncDirectoryWrapperTest.java @@ -0,0 +1,267 @@ +/******************************************************************************* + * Copyright (c) 2025 Eclipse RDF4J contributors. + * + * All rights reserved. This program and the accompanying materials + * are made available under the terms of the Eclipse Distribution License v1.0 + * which accompanies this distribution, and is available at + * http://www.eclipse.org/org/documents/edl-v10.php. + * + * SPDX-License-Identifier: BSD-3-Clause + *******************************************************************************/ +package org.eclipse.rdf4j.sail.lucene.impl; + +import static org.junit.jupiter.api.Assertions.assertEquals; + +import java.io.IOException; +import java.util.Collection; +import java.util.List; +import java.util.concurrent.atomic.AtomicInteger; + +import org.apache.lucene.analysis.standard.StandardAnalyzer; +import org.apache.lucene.store.Directory; +import org.apache.lucene.store.FilterDirectory; +import org.apache.lucene.store.RAMDirectory; +import org.junit.jupiter.api.Test; + +/** + * @author Piotr Sowiński + */ +public class DelayedSyncDirectoryWrapperTest { + + private static final class TrackingDirectory extends FilterDirectory { + private AtomicInteger syncCount = new AtomicInteger(0); + private AtomicInteger metaSyncCount = new AtomicInteger(0); + private AtomicInteger closeCount = new AtomicInteger(0); + + TrackingDirectory(Directory in) { + super(in); + } + + @Override + public void sync(Collection names) throws IOException { + syncCount.getAndIncrement(); + super.sync(names); + } + + @Override + public void syncMetaData() throws IOException { + metaSyncCount.getAndIncrement(); + super.syncMetaData(); + } + + @Override + public void close() throws IOException { + closeCount.getAndIncrement(); + super.close(); + } + + public int getSyncCount() { + return syncCount.get(); + } + + public int getMetaSyncCount() { + return metaSyncCount.get(); + } + + public int getCloseCount() { + return closeCount.get(); + } + } + + @Test + public void testSyncData() throws IOException { + final var dir = new TrackingDirectory(new RAMDirectory()); + final var delayedDir = new DelayedSyncDirectoryWrapper(dir, 500, 100); + + assertEquals(0, dir.getSyncCount()); + assertEquals(0, dir.getMetaSyncCount()); + + delayedDir.sync(List.of("file1", "file2")); + + // The sync should be delayed, so the count should still be 0 + assertEquals(0, dir.getSyncCount()); + + // Wait for more than the fsync interval to allow the scheduled task to run + waitFor(700); + assertEquals(1, dir.getSyncCount()); + // Meta sync should still be 0 + assertEquals(0, dir.getMetaSyncCount()); + + delayedDir.close(); + + // No additional syncs should have occurred after close + assertEquals(1, dir.getSyncCount()); + assertEquals(0, dir.getMetaSyncCount()); + assertEquals(1, dir.getCloseCount()); + } + + @Test + public void testSyncMetaData() throws IOException { + final var dir = new TrackingDirectory(new RAMDirectory()); + final var delayedDir = new DelayedSyncDirectoryWrapper(dir, 500, 100); + + assertEquals(0, dir.getSyncCount()); + assertEquals(0, dir.getMetaSyncCount()); + + delayedDir.syncMetaData(); + + // The meta sync should be delayed, so the count should still be 0 + assertEquals(0, dir.getMetaSyncCount()); + + // Wait for more than the fsync interval to allow the scheduled task to run + waitFor(700); + assertEquals(1, dir.getMetaSyncCount()); + // Regular sync should still be 0 + assertEquals(0, dir.getSyncCount()); + + delayedDir.close(); + + // No additional syncs should have occurred after close + assertEquals(0, dir.getSyncCount()); + assertEquals(1, dir.getMetaSyncCount()); + assertEquals(1, dir.getCloseCount()); + } + + @Test + public void testSyncMixed() throws IOException { + final var dir = new TrackingDirectory(new RAMDirectory()); + final var delayedDir = new DelayedSyncDirectoryWrapper(dir, 500, 100); + + assertEquals(0, dir.getSyncCount()); + assertEquals(0, dir.getMetaSyncCount()); + + delayedDir.sync(List.of("file1", "file2")); + delayedDir.sync(List.of("file2", "file456")); + delayedDir.syncMetaData(); + delayedDir.syncMetaData(); + delayedDir.syncMetaData(); + + // The syncs should be delayed, so the counts should still be 0 + assertEquals(0, dir.getSyncCount()); + assertEquals(0, dir.getMetaSyncCount()); + + // Wait for more than the fsync interval to allow the scheduled task to run + waitFor(700); + assertEquals(1, dir.getSyncCount()); + assertEquals(1, dir.getMetaSyncCount()); + + delayedDir.sync(List.of("file2", "file456")); + delayedDir.syncMetaData(); + + waitFor(700); + assertEquals(2, dir.getSyncCount()); + assertEquals(2, dir.getMetaSyncCount()); + + // Wait again to ensure no extra syncs occur + waitFor(700); + assertEquals(2, dir.getSyncCount()); + assertEquals(2, dir.getMetaSyncCount()); + + delayedDir.close(); + + // No additional syncs should have occurred after close + assertEquals(2, dir.getSyncCount()); + assertEquals(2, dir.getMetaSyncCount()); + assertEquals(1, dir.getCloseCount()); + } + + @Test + public void testSyncMixed_afterClose() throws IOException { + final var dir = new TrackingDirectory(new RAMDirectory()); + final var delayedDir = new DelayedSyncDirectoryWrapper(dir, 500, 100); + + assertEquals(0, dir.getSyncCount()); + assertEquals(0, dir.getMetaSyncCount()); + + delayedDir.sync(List.of("file1", "file2")); + delayedDir.sync(List.of("file2", "file456")); + delayedDir.syncMetaData(); + delayedDir.syncMetaData(); + delayedDir.syncMetaData(); + + assertEquals(0, dir.getSyncCount()); + assertEquals(0, dir.getMetaSyncCount()); + + delayedDir.close(); + + // The syncs should be executed on close + assertEquals(1, dir.getSyncCount()); + assertEquals(1, dir.getMetaSyncCount()); + assertEquals(1, dir.getCloseCount()); + } + + @Test + public void testCloseOnIndexShutDown() throws IOException { + final var dir = new TrackingDirectory(new RAMDirectory()); + final var delayedDir = new DelayedSyncDirectoryWrapper(dir, 500, 100); + final var index = new LuceneIndex(delayedDir, new StandardAnalyzer()); + assertEquals(0, dir.getCloseCount()); + + index.shutDown(); + + assertEquals(1, dir.getCloseCount()); + } + + @Test + public void testRethrowLastSyncException() throws IOException { + final var dir = new FilterDirectory(new RAMDirectory()) { + @Override + public void sync(Collection names) throws IOException { + throw new IOException("Simulated IO exception during sync"); + } + + @Override + public void syncMetaData() throws IOException { + throw new IOException("Simulated IO exception during syncMetaData"); + } + }; + final var delayedDir = new DelayedSyncDirectoryWrapper(dir, 300, 100); + + // This should not throw immediately + delayedDir.sync(List.of("file1", "file2")); + waitFor(500); + try { + delayedDir.syncMetaData(); + } catch (IOException e) { + // The exception from the previous sync should be rethrown here + assertEquals("Simulated IO exception during sync", e.getMessage()); + } + + waitFor(500); + try { + delayedDir.sync(List.of("file3")); + } catch (IOException e) { + // The exception from the previous syncMetaData should be rethrown here + assertEquals("Simulated IO exception during syncMetaData", e.getMessage()); + } + } + + @Test + public void testSyncIfOverSyncLimit() throws IOException { + final var dir = new TrackingDirectory(new RAMDirectory()); + final var delayedDir = new DelayedSyncDirectoryWrapper( + dir, + 100_000, // Large interval to prevent scheduled syncs during the test + 5 // Low max pending syncs to trigger sync quickly + ); + + assertEquals(0, dir.getSyncCount()); + delayedDir.sync(List.of("file1", "file2", "file3", "file4")); + // Still no sync should have occurred, 4 < 5 + assertEquals(0, dir.getSyncCount()); + // Sync the same files again, should still be 4 unique files + delayedDir.sync(List.of("file1", "file2", "file3", "file4")); + assertEquals(0, dir.getSyncCount()); + // Sync one more file, should trigger the sync as we now have 5 unique files + delayedDir.sync(List.of("file5")); + assertEquals(1, dir.getSyncCount()); + } + + private void waitFor(long millis) { + try { + Thread.sleep(millis); + } catch (InterruptedException e) { + Thread.currentThread().interrupt(); + } + } +} diff --git a/core/sail/lucene/src/test/java/org/eclipse/rdf4j/sail/lucene/impl/LuceneDelayedFsyncTest.java b/core/sail/lucene/src/test/java/org/eclipse/rdf4j/sail/lucene/impl/LuceneDelayedFsyncTest.java new file mode 100644 index 00000000000..72276cae57a --- /dev/null +++ b/core/sail/lucene/src/test/java/org/eclipse/rdf4j/sail/lucene/impl/LuceneDelayedFsyncTest.java @@ -0,0 +1,58 @@ +/******************************************************************************* + * Copyright (c) 2025 Eclipse RDF4J contributors. + * + * All rights reserved. This program and the accompanying materials + * are made available under the terms of the Eclipse Distribution License v1.0 + * which accompanies this distribution, and is available at + * http://www.eclipse.org/org/documents/edl-v10.php. + * + * SPDX-License-Identifier: BSD-3-Clause + *******************************************************************************/ +package org.eclipse.rdf4j.sail.lucene.impl; + +import static org.assertj.core.api.AssertionsForClassTypes.assertThat; +import static org.junit.jupiter.api.Assertions.assertNotNull; + +import java.io.File; +import java.io.IOException; +import java.util.Properties; + +import org.apache.lucene.analysis.standard.StandardAnalyzer; +import org.apache.lucene.store.NIOFSDirectory; +import org.eclipse.rdf4j.sail.lucene.LuceneSail; +import org.junit.jupiter.api.Test; +import org.junit.jupiter.api.io.TempDir; + +/** + * Test to verify that when fsync interval is set, the index uses DelayedSyncDirectoryWrapper. It also checks if the + * index still works correctly while using the wrapper. + * + * @author Piotr Sowiński + */ +public class LuceneDelayedFsyncTest extends AbstractGenericLuceneTest { + + @TempDir + public File dataDir; + + private LuceneIndex index; + + @Override + protected void configure(LuceneSail sail) throws IOException { + index = new LuceneIndex(new NIOFSDirectory(dataDir.toPath()), new StandardAnalyzer()); + var params = new Properties(); + params.setProperty(LuceneSail.FSYNC_INTERVAL_KEY, "5000"); // 5 seconds + params.setProperty(LuceneSail.LUCENE_DIR_KEY, dataDir.getAbsolutePath()); + try { + index.initialize(params); + } catch (Exception e) { + throw new RuntimeException(e); + } + sail.setLuceneIndex(index); + } + + @Test + public void testIndexSettings() { + assertNotNull(index); + assertThat(index.getDirectory()).isInstanceOf(DelayedSyncDirectoryWrapper.class); + } +} From 8304a6fd7ba30815f85846059fe9ae64f1610fb5 Mon Sep 17 00:00:00 2001 From: =?UTF-8?q?Ha=CC=8Avard=20Ottestad?= Date: Fri, 3 Oct 2025 21:04:03 +0200 Subject: [PATCH 2/4] improve resilience --- .../impl/DelayedSyncDirectoryWrapper.java | 143 ++++++++++++++---- .../rdf4j/sail/lucene/impl/LuceneIndex.java | 8 +- 2 files changed, 121 insertions(+), 30 deletions(-) diff --git a/core/sail/lucene/src/main/java/org/eclipse/rdf4j/sail/lucene/impl/DelayedSyncDirectoryWrapper.java b/core/sail/lucene/src/main/java/org/eclipse/rdf4j/sail/lucene/impl/DelayedSyncDirectoryWrapper.java index fdfc5eb4fb2..5029474d45a 100644 --- a/core/sail/lucene/src/main/java/org/eclipse/rdf4j/sail/lucene/impl/DelayedSyncDirectoryWrapper.java +++ b/core/sail/lucene/src/main/java/org/eclipse/rdf4j/sail/lucene/impl/DelayedSyncDirectoryWrapper.java @@ -16,13 +16,14 @@ import java.util.HashSet; import java.util.List; import java.util.concurrent.Executors; -import java.util.concurrent.ScheduledExecutorService; +import java.util.concurrent.ScheduledThreadPoolExecutor; import java.util.concurrent.TimeUnit; import java.util.concurrent.atomic.AtomicBoolean; import java.util.concurrent.atomic.AtomicReference; import org.apache.lucene.store.Directory; import org.apache.lucene.store.FilterDirectory; +import org.eclipse.rdf4j.sail.SailException; import org.slf4j.Logger; import org.slf4j.LoggerFactory; @@ -35,16 +36,20 @@ class DelayedSyncDirectoryWrapper extends FilterDirectory { final private Logger logger = LoggerFactory.getLogger(getClass()); - final private ScheduledExecutorService scheduler; + final private ScheduledThreadPoolExecutor scheduler; final private AtomicBoolean needsMetadataSync = new AtomicBoolean(false); - final private AtomicReference lastSyncException = new AtomicReference<>(null); + final private AtomicReference lastSyncThrowable = new AtomicReference<>(null); final private HashSet pendingSyncs = new HashSet<>(); final private int maxPendingSyncs; + private final Object syncMonitor = new Object(); + + private boolean closed = false; + /** * Creates a new instance of LuceneDirectoryWrapper. * @@ -54,71 +59,155 @@ class DelayedSyncDirectoryWrapper extends FilterDirectory { */ DelayedSyncDirectoryWrapper(Directory in, long fsyncInterval, int maxPendingSyncs) { super(in); + assert fsyncInterval > 0; assert maxPendingSyncs > 0; + this.maxPendingSyncs = maxPendingSyncs; - scheduler = Executors.newScheduledThreadPool(1); - scheduler.scheduleAtFixedRate( - this::doSync, + + // Use a daemon thread so the scheduler does not prevent JVM shutdown. + this.scheduler = new ScheduledThreadPoolExecutor(1, r -> { + Thread t = Executors.defaultThreadFactory().newThread(r); + t.setDaemon(true); + t.setName("rdf4j-lucene-sync-" + t.getId()); + return t; + }); + + // Help GC by removing cancelled tasks from the queue. + this.scheduler.setRemoveOnCancelPolicy(true); + + this.scheduler.scheduleAtFixedRate( + () -> { + try { + doSync(); + } catch (Throwable e) { + // keep scheduling even if Errors occur + logger.error(e.getClass().getSimpleName() + " during a periodic sync of Lucene index", e); + // Throwable is recorded and rethrown on next sync()/syncMetaData() call by checkException(). + try { + Thread.sleep(10); // slight throttle to avoid busy looping + } catch (InterruptedException ex) { + Thread.currentThread().interrupt(); + } + } + }, fsyncInterval, fsyncInterval, TimeUnit.MILLISECONDS ); } - private void doSync() { + private void doSync() throws IOException { + logger.debug("Performing periodic sync of Lucene index"); List toSync; synchronized (pendingSyncs) { toSync = new ArrayList<>(pendingSyncs); pendingSyncs.clear(); } - if (!toSync.isEmpty()) { - try { - super.sync(toSync); - } catch (IOException e) { - lastSyncException.set(e); - logger.error("IO error during a periodic sync of Lucene index files", e); + + boolean needsMetaDataSync = this.needsMetadataSync.get(); + + try { + if (toSync.isEmpty() && !this.needsMetadataSync.get()) { + logger.debug("Nothing to sync"); + // Nothing to sync + return; } - } - if (this.needsMetadataSync.getAndSet(false)) { - try { - super.syncMetaData(); - } catch (IOException e) { - lastSyncException.set(e); - logger.error("IO error during a periodic sync of Lucene index metadata", e); + + synchronized (syncMonitor) { + if (!toSync.isEmpty()) { + try { + logger.debug("Syncing files"); + super.sync(toSync); + } catch (Throwable e) { + logger.error(e.getClass().getSimpleName() + " during a periodic sync of Lucene index files", e); + throw e; + } + } + needsMetaDataSync = this.needsMetadataSync.getAndSet(false); + if (needsMetaDataSync) { + try { + logger.debug("Syncing metadata"); + super.syncMetaData(); + } catch (Throwable e) { + logger.error(e.getClass().getSimpleName() + " during a periodic sync of Lucene index metadata", + e); + throw e; + } + } + } + } catch (Throwable t) { + lastSyncThrowable.set(t); + synchronized (pendingSyncs) { + pendingSyncs.addAll(toSync); + if (needsMetaDataSync) { + needsMetadataSync.set(true); + } } + throw t; } + } @Override public void sync(Collection names) throws IOException { - final IOException ex = lastSyncException.getAndSet(null); - if (ex != null) { - // Rethrow the last exception if there was one. - // This will fail the current transaction, and not the one that caused the original exception. - // But there is no other way to notify the caller of the error, as the sync is done asynchronously. - throw ex; + checkException(); + if (closed) { + throw new SailException("DelayedSyncDirectoryWrapper is closed"); } + + boolean doImmediateSync = false; synchronized (pendingSyncs) { pendingSyncs.addAll(names); if (pendingSyncs.size() >= maxPendingSyncs) { // If we have accumulated too many pending syncs, do a sync right away // to avoid excessive memory usage - doSync(); + doImmediateSync = true; } } + if (doImmediateSync) { + doSync(); + } } @Override public void syncMetaData() throws IOException { + checkException(); + if (closed) { + throw new SailException("DelayedSyncDirectoryWrapper is closed"); + } needsMetadataSync.set(true); } + private void checkException() throws IOException { + final Throwable t = lastSyncThrowable.getAndSet(null); + if (t != null) { + // Rethrow the last exception if there was one. + // This will fail the current transaction, and not the one that caused the original exception. + // But there is no other way to notify the caller of the error, as the sync is done asynchronously. + + if (t instanceof IOException) { + throw ((IOException) t); + } else { + throw new SailException(t); + } + } + } + @Override public void close() throws IOException { + closed = true; + // Finish the current sync task, if in progress and then shut down try { scheduler.shutdown(); + if (!scheduler.awaitTermination(10, TimeUnit.SECONDS)) { + logger.error("Failed to shut down Lucene directory sync scheduler within 10s"); + throw new SailException("Failed to shut down Lucene directory sync scheduler within 10s"); + } + } catch (InterruptedException e) { + Thread.currentThread().interrupt(); + throw new SailException(e); } finally { // Do a final sync of any remaining files try { diff --git a/core/sail/lucene/src/main/java/org/eclipse/rdf4j/sail/lucene/impl/LuceneIndex.java b/core/sail/lucene/src/main/java/org/eclipse/rdf4j/sail/lucene/impl/LuceneIndex.java index cb59fb0597c..1b80da33bcc 100644 --- a/core/sail/lucene/src/main/java/org/eclipse/rdf4j/sail/lucene/impl/LuceneIndex.java +++ b/core/sail/lucene/src/main/java/org/eclipse/rdf4j/sail/lucene/impl/LuceneIndex.java @@ -408,9 +408,11 @@ public void shutDown() throws IOException { toCloseIndexWriter.close(); } } finally { - // Close the directory -- if asynchronous fsync is used, this will clean - // up the scheduler thread too. - directory.close(); + if (directory != null) { + // Close the directory -- if asynchronous fsync is used, this will clean + // up the scheduler thread too. + directory.close(); + } } } finally { if (!exceptions.isEmpty()) { From 72f4a5548647a7423da837ced0d678904f8c14c0 Mon Sep 17 00:00:00 2001 From: =?UTF-8?q?Ha=CC=8Avard=20Ottestad?= Date: Fri, 3 Oct 2025 22:36:44 +0200 Subject: [PATCH 3/4] improve resilience --- .../impl/DelayedSyncDirectoryWrapper.java | 54 +++++++++---- .../impl/DelayedSyncDirectoryWrapperTest.java | 77 +++++++++++++++---- 2 files changed, 101 insertions(+), 30 deletions(-) diff --git a/core/sail/lucene/src/main/java/org/eclipse/rdf4j/sail/lucene/impl/DelayedSyncDirectoryWrapper.java b/core/sail/lucene/src/main/java/org/eclipse/rdf4j/sail/lucene/impl/DelayedSyncDirectoryWrapper.java index 5029474d45a..5e0b7bf6a77 100644 --- a/core/sail/lucene/src/main/java/org/eclipse/rdf4j/sail/lucene/impl/DelayedSyncDirectoryWrapper.java +++ b/core/sail/lucene/src/main/java/org/eclipse/rdf4j/sail/lucene/impl/DelayedSyncDirectoryWrapper.java @@ -10,9 +10,11 @@ *******************************************************************************/ package org.eclipse.rdf4j.sail.lucene.impl; +import java.io.FileNotFoundException; import java.io.IOException; import java.util.ArrayList; import java.util.Collection; +import java.util.Collections; import java.util.HashSet; import java.util.List; import java.util.concurrent.Executors; @@ -105,27 +107,20 @@ private void doSync() throws IOException { pendingSyncs.clear(); } - boolean needsMetaDataSync = this.needsMetadataSync.get(); + boolean metaRequestedInitial = this.needsMetadataSync.get(); + boolean metaToProcess = false; try { - if (toSync.isEmpty() && !this.needsMetadataSync.get()) { + if (toSync.isEmpty() && !metaRequestedInitial) { logger.debug("Nothing to sync"); // Nothing to sync return; } synchronized (syncMonitor) { - if (!toSync.isEmpty()) { - try { - logger.debug("Syncing files"); - super.sync(toSync); - } catch (Throwable e) { - logger.error(e.getClass().getSimpleName() + " during a periodic sync of Lucene index files", e); - throw e; - } - } - needsMetaDataSync = this.needsMetadataSync.getAndSet(false); - if (needsMetaDataSync) { + // Process metadata first if requested + metaToProcess = this.needsMetadataSync.getAndSet(false); + if (metaToProcess) { try { logger.debug("Syncing metadata"); super.syncMetaData(); @@ -135,12 +130,40 @@ private void doSync() throws IOException { throw e; } } + + if (!toSync.isEmpty()) { + try { + logger.debug("Syncing files"); + super.sync(toSync); + } catch (Throwable e) { + // Lucene files may be merged/removed between scheduling and sync. + // Treat missing files as benign and attempt per-file sync, ignoring those missing. + if (e instanceof java.nio.file.NoSuchFileException || e instanceof FileNotFoundException) { + for (String name : toSync) { + try { + super.sync(Collections.singleton(name)); + } catch (java.nio.file.NoSuchFileException | FileNotFoundException ignore) { + // File disappeared before fsync: safe to ignore + } catch (Throwable t) { + logger.error(t.getClass().getSimpleName() + + " during a periodic sync of Lucene index files (per-file)", t); + throw t; + } + } + // Consider sync successful when only missing files were encountered. + } else { + logger.error(e.getClass().getSimpleName() + " during a periodic sync of Lucene index files", + e); + throw e; + } + } + } } } catch (Throwable t) { lastSyncThrowable.set(t); synchronized (pendingSyncs) { pendingSyncs.addAll(toSync); - if (needsMetaDataSync) { + if (metaToProcess) { needsMetadataSync.set(true); } } @@ -172,11 +195,12 @@ public void sync(Collection names) throws IOException { @Override public void syncMetaData() throws IOException { + // Request a metadata sync even if a previous error is pending + needsMetadataSync.set(true); checkException(); if (closed) { throw new SailException("DelayedSyncDirectoryWrapper is closed"); } - needsMetadataSync.set(true); } private void checkException() throws IOException { diff --git a/core/sail/lucene/src/test/java/org/eclipse/rdf4j/sail/lucene/impl/DelayedSyncDirectoryWrapperTest.java b/core/sail/lucene/src/test/java/org/eclipse/rdf4j/sail/lucene/impl/DelayedSyncDirectoryWrapperTest.java index 67846975aa1..d0423b8cdb0 100644 --- a/core/sail/lucene/src/test/java/org/eclipse/rdf4j/sail/lucene/impl/DelayedSyncDirectoryWrapperTest.java +++ b/core/sail/lucene/src/test/java/org/eclipse/rdf4j/sail/lucene/impl/DelayedSyncDirectoryWrapperTest.java @@ -11,10 +11,12 @@ package org.eclipse.rdf4j.sail.lucene.impl; import static org.junit.jupiter.api.Assertions.assertEquals; +import static org.junit.jupiter.api.Assertions.fail; import java.io.IOException; import java.util.Collection; import java.util.List; +import java.util.concurrent.CountDownLatch; import java.util.concurrent.atomic.AtomicInteger; import org.apache.lucene.analysis.standard.StandardAnalyzer; @@ -203,37 +205,82 @@ public void testCloseOnIndexShutDown() throws IOException { } @Test - public void testRethrowLastSyncException() throws IOException { + public void testRethrowLastSyncException() throws IOException, InterruptedException { + CountDownLatch latch = new CountDownLatch(1); + CountDownLatch latch2 = new CountDownLatch(1); final var dir = new FilterDirectory(new RAMDirectory()) { @Override public void sync(Collection names) throws IOException { - throw new IOException("Simulated IO exception during sync"); + try { + throw new IOException("Simulated IO exception during sync"); + } finally { + latch.countDown(); + } } @Override public void syncMetaData() throws IOException { - throw new IOException("Simulated IO exception during syncMetaData"); + try { + latch.await(); + } catch (InterruptedException e) { + throw new RuntimeException(e); + } + try { + throw new IOException("Simulated IO exception during syncMetaData"); + } finally { + latch2.countDown(); + } } }; final var delayedDir = new DelayedSyncDirectoryWrapper(dir, 300, 100); // This should not throw immediately delayedDir.sync(List.of("file1", "file2")); - waitFor(500); - try { - delayedDir.syncMetaData(); - } catch (IOException e) { - // The exception from the previous sync should be rethrown here - assertEquals("Simulated IO exception during sync", e.getMessage()); + latch.await(); + boolean exceptionThrown = false; + for (int i = 0; i < 10; i++) { + Thread.sleep(10); + try { + delayedDir.syncMetaData(); + } catch (IOException e) { + if (!"Simulated IO exception during sync".equals(e.getMessage())) { + if (i < 9) { + continue; + } + } + // The exception from the previous sync should be rethrown here + assertEquals("Simulated IO exception during sync", e.getMessage()); + exceptionThrown = true; + } + + } + if (!exceptionThrown) { + fail("Expected IOException was not thrown"); } - waitFor(500); - try { - delayedDir.sync(List.of("file3")); - } catch (IOException e) { - // The exception from the previous syncMetaData should be rethrown here - assertEquals("Simulated IO exception during syncMetaData", e.getMessage()); + latch2.await(); + + exceptionThrown = false; + for (int i = 0; i < 10; i++) { + Thread.sleep(10); + try { + delayedDir.sync(List.of("file3")); + } catch (IOException e) { + if (!"Simulated IO exception during syncMetaData".equals(e.getMessage())) { + if (i < 9) { + continue; + } + } + // The exception from the previous sync should be rethrown here + assertEquals("Simulated IO exception during syncMetaData", e.getMessage()); + exceptionThrown = true; + } + } + + if (!exceptionThrown) { + fail("Expected IOException was not thrown"); } + } @Test From 41bcc22cf28d9ed7b65fa2d9d286d0bcd708ee31 Mon Sep 17 00:00:00 2001 From: =?UTF-8?q?Ha=CC=8Avard=20Ottestad?= Date: Fri, 3 Oct 2025 23:21:24 +0200 Subject: [PATCH 4/4] improve resilience --- .../sail/lucene/impl/DelayedSyncDirectoryWrapper.java | 7 +++++-- 1 file changed, 5 insertions(+), 2 deletions(-) diff --git a/core/sail/lucene/src/main/java/org/eclipse/rdf4j/sail/lucene/impl/DelayedSyncDirectoryWrapper.java b/core/sail/lucene/src/main/java/org/eclipse/rdf4j/sail/lucene/impl/DelayedSyncDirectoryWrapper.java index 5e0b7bf6a77..81732512c58 100644 --- a/core/sail/lucene/src/main/java/org/eclipse/rdf4j/sail/lucene/impl/DelayedSyncDirectoryWrapper.java +++ b/core/sail/lucene/src/main/java/org/eclipse/rdf4j/sail/lucene/impl/DelayedSyncDirectoryWrapper.java @@ -142,8 +142,11 @@ private void doSync() throws IOException { for (String name : toSync) { try { super.sync(Collections.singleton(name)); - } catch (java.nio.file.NoSuchFileException | FileNotFoundException ignore) { - // File disappeared before fsync: safe to ignore + } catch (java.nio.file.NoSuchFileException | FileNotFoundException logged) { +// logger.warn( +// "Could not sync file {},it seems to have been removed before it could be synced. Probably fine.", +// name, logged); + throw logged; } catch (Throwable t) { logger.error(t.getClass().getSimpleName() + " during a periodic sync of Lucene index files (per-file)", t);