Skip to content
Closed
Show file tree
Hide file tree
Changes from 1 commit
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
Original file line number Diff line number Diff line change
Expand Up @@ -29,6 +29,7 @@
import org.elasticsearch.ExceptionsHelper;
import org.elasticsearch.TransportVersion;
import org.elasticsearch.cluster.node.DiscoveryNode;
import org.elasticsearch.common.Strings;
import org.elasticsearch.common.io.stream.NamedWriteableRegistry;
import org.elasticsearch.common.network.NetworkService;
import org.elasticsearch.common.network.ThreadWatchdog;
Expand Down Expand Up @@ -269,8 +270,10 @@ protected Netty4TcpChannel initiateChannel(DiscoveryNode node, ConnectionProfile

Channel channel = connectFuture.channel();
if (channel == null) {
ExceptionsHelper.maybeDieOnAnotherThread(connectFuture.cause());
throw new IOException(connectFuture.cause());
final var cause = connectFuture.cause();
logger.warn(Strings.format("failed to initiate channel to [%s]", node), cause);
ExceptionsHelper.maybeDieOnAnotherThread(cause);
throw new IOException(cause);
}
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I believe you need a callback. Checking null on channel might be not enough.
connectFuture.addListener(f -> if (f.isSuccess() == false) { log.error...})

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

We add that listener here:

I don't think we want to log every such failure, and definitely not at error. The logging we have today is enough for that.

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Wait sorry I get what you mean: if channel == null we should still wait for connectFuture to complete before logging the error.

Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Ah, I see. addListener(connectFuture, connectContext); should be enough. if (channel == null) block is confusing, channel is undefined until future is resolved. We should pass only connectFuture, not channel, to the Netty4TcpChannel, once connectFuture is resolved Netty4TcpChannel should update it's own channel.

Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

channel is undefined

I meant there are few steps that can go wrong - channel initialization and registration and failures are dispatched at different thread either event-loop or global-executor. In all cases channel would be closed forcibly. But using channel that failed to initialize means we don't have our handlers attached.


Netty4TcpChannel nettyChannel = new Netty4TcpChannel(
Expand Down
4 changes: 4 additions & 0 deletions server/src/main/java/org/elasticsearch/ExceptionsHelper.java
Original file line number Diff line number Diff line change
Expand Up @@ -399,6 +399,10 @@ public static <T extends Throwable> Optional<T> unwrapCausesAndSuppressed(Throwa
return Optional.of((T) cause);
}

if (cause == null) {
return Optional.empty();
}

final Queue<Throwable> queue = new LinkedList<>();
queue.add(cause);
final Set<Throwable> seen = Collections.newSetFromMap(new IdentityHashMap<>());
Expand Down
12 changes: 12 additions & 0 deletions server/src/test/java/org/elasticsearch/ExceptionsHelperTests.java
Original file line number Diff line number Diff line change
Expand Up @@ -27,8 +27,10 @@

import java.io.IOException;
import java.util.Optional;
import java.util.concurrent.atomic.AtomicBoolean;

import static org.elasticsearch.ExceptionsHelper.maybeError;
import static org.elasticsearch.ExceptionsHelper.unwrapCausesAndSuppressed;
import static org.hamcrest.CoreMatchers.equalTo;
import static org.hamcrest.CoreMatchers.instanceOf;
import static org.hamcrest.CoreMatchers.nullValue;
Expand Down Expand Up @@ -494,4 +496,14 @@ private static String compressPackages(String className) {
ExceptionsHelper.compressPackages(s, className);
return s.toString();
}

public void testUnwrapCausesAndSuppressedNullHandling() {
final AtomicBoolean predicateCalled = new AtomicBoolean();
assertEquals(Optional.empty(), unwrapCausesAndSuppressed(null, value -> {
assertNull(value);
assertTrue(predicateCalled.compareAndSet(false, true)); // called no more than once
return randomBoolean();
}));
assertTrue(predicateCalled.get()); // called at least once
}
}