Skip to content
Open
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
Original file line number Diff line number Diff line change
Expand Up @@ -16,6 +16,9 @@
import java.util.LinkedList;
import java.util.List;
import java.util.Set;
import java.util.concurrent.ExecutorService;
import java.util.concurrent.Executors;
import java.util.concurrent.Future;

import org.eclipse.rdf4j.common.iteration.CloseableIteration;
import org.eclipse.rdf4j.common.iteration.EmptyIteration;
Expand Down Expand Up @@ -62,6 +65,10 @@ private class BatchingServiceIteration extends JoinExecutorBase<BindingSet> {

private final Service service;

private final ExecutorService threadExecutor = Executors.newSingleThreadExecutor();

private final Future<?> querySubmissionTask;

/**
* @param inputBindings
* @throws QueryEvaluationException
Expand All @@ -71,13 +78,26 @@ public BatchingServiceIteration(CloseableIteration<BindingSet> inputBindings,
super(inputBindings, null, EmptyBindingSet.getInstance());
this.blockSize = blockSize;
this.service = service;
run();

// Set up a consumer task to send HTTP requests in parallel. This must be done in a
// separate thread, because submitting HTTP requests may block if the HTTP pool is full.
// In that case, we would enter a deadlock, with the main thread waiting for both the
// pool to yield, and the consumer of the bindings to read from the queue.
// See: https://github.com/eclipse-rdf4j/rdf4j/discussions/5120
// Test case: https://github.com/tkuhn/rdf4j-timeout-test
try {
querySubmissionTask = threadExecutor.submit(this::run);
} catch (Exception e) {
throw new QueryEvaluationException("Failed to start a thread for batched federated query submission",
e);
}
}

@Override
protected void handleBindings() throws Exception {
// Note: any exceptions here will be intercepted by the caller and tossed asynchronously
// via the rightQueue.
while (!isClosed() && leftIter.hasNext()) {

ArrayList<BindingSet> blockBindings = new ArrayList<>(blockSize);
for (int i = 0; i < blockSize; i++) {
if (!leftIter.hasNext()) {
Expand All @@ -87,9 +107,19 @@ protected void handleBindings() throws Exception {
}
CloseableIteration<BindingSet> materializedIter = new CollectionIteration<>(
blockBindings);
// evaluateInternal is BLOCKING if the HTTP pool is exhausted
addResult(evaluateInternal(service, materializedIter, service.getBaseURI()));
}
}

@Override
public void handleClose() throws QueryEvaluationException {
super.handleClose();
if (querySubmissionTask != null) {
querySubmissionTask.cancel(true);
}
threadExecutor.shutdownNow();
}
}

/**
Expand Down Expand Up @@ -631,4 +661,4 @@ private static void closeQuietly(RepositoryConnection conn) {
logger.debug("Details: ", t);
}
}
}
}
Loading