diff --git a/core/repository/sparql/src/main/java/org/eclipse/rdf4j/repository/sparql/federation/RepositoryFederatedService.java b/core/repository/sparql/src/main/java/org/eclipse/rdf4j/repository/sparql/federation/RepositoryFederatedService.java index 936733bfa58..9c8e7a47e7e 100644 --- a/core/repository/sparql/src/main/java/org/eclipse/rdf4j/repository/sparql/federation/RepositoryFederatedService.java +++ b/core/repository/sparql/src/main/java/org/eclipse/rdf4j/repository/sparql/federation/RepositoryFederatedService.java @@ -16,6 +16,9 @@ import java.util.LinkedList; import java.util.List; import java.util.Set; +import java.util.concurrent.ExecutorService; +import java.util.concurrent.Executors; +import java.util.concurrent.Future; import org.eclipse.rdf4j.common.iteration.CloseableIteration; import org.eclipse.rdf4j.common.iteration.EmptyIteration; @@ -62,6 +65,10 @@ private class BatchingServiceIteration extends JoinExecutorBase { private final Service service; + private final ExecutorService threadExecutor = Executors.newSingleThreadExecutor(); + + private final Future querySubmissionTask; + /** * @param inputBindings * @throws QueryEvaluationException @@ -71,13 +78,26 @@ public BatchingServiceIteration(CloseableIteration inputBindings, super(inputBindings, null, EmptyBindingSet.getInstance()); this.blockSize = blockSize; this.service = service; - run(); + + // Set up a consumer task to send HTTP requests in parallel. This must be done in a + // separate thread, because submitting HTTP requests may block if the HTTP pool is full. + // In that case, we would enter a deadlock, with the main thread waiting for both the + // pool to yield, and the consumer of the bindings to read from the queue. + // See: https://github.com/eclipse-rdf4j/rdf4j/discussions/5120 + // Test case: https://github.com/tkuhn/rdf4j-timeout-test + try { + querySubmissionTask = threadExecutor.submit(this::run); + } catch (Exception e) { + throw new QueryEvaluationException("Failed to start a thread for batched federated query submission", + e); + } } @Override protected void handleBindings() throws Exception { + // Note: any exceptions here will be intercepted by the caller and tossed asynchronously + // via the rightQueue. while (!isClosed() && leftIter.hasNext()) { - ArrayList blockBindings = new ArrayList<>(blockSize); for (int i = 0; i < blockSize; i++) { if (!leftIter.hasNext()) { @@ -87,9 +107,19 @@ protected void handleBindings() throws Exception { } CloseableIteration materializedIter = new CollectionIteration<>( blockBindings); + // evaluateInternal is BLOCKING if the HTTP pool is exhausted addResult(evaluateInternal(service, materializedIter, service.getBaseURI())); } } + + @Override + public void handleClose() throws QueryEvaluationException { + super.handleClose(); + if (querySubmissionTask != null) { + querySubmissionTask.cancel(true); + } + threadExecutor.shutdownNow(); + } } /** @@ -631,4 +661,4 @@ private static void closeQuietly(RepositoryConnection conn) { logger.debug("Details: ", t); } } -} +} \ No newline at end of file