Skip to content
Merged
Show file tree
Hide file tree
Changes from 6 commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
5 changes: 5 additions & 0 deletions docs/changelog/132243.yaml
Original file line number Diff line number Diff line change
@@ -0,0 +1,5 @@
pr: 132243
summary: Fix `NullPointerException` in transport trace logger
area: Network
type: bug
issues: []
Original file line number Diff line number Diff line change
Expand Up @@ -13,7 +13,6 @@
import org.elasticsearch.TransportVersion;
import org.elasticsearch.TransportVersions;
import org.elasticsearch.common.bytes.BytesReference;
import org.elasticsearch.common.compress.CompressorFactory;
import org.elasticsearch.common.io.stream.StreamInput;
import org.elasticsearch.common.util.concurrent.ThreadContext;
import org.elasticsearch.core.IOUtils;
Expand All @@ -30,7 +29,7 @@ static void logInboundMessage(TcpChannel channel, BytesReference message) {
try {
String logMessage = format(channel, message, "READ");
logger.trace(logMessage);
} catch (IOException e) {
} catch (Exception e) {
logger.warn("an exception occurred formatting a READ trace message", e);
}
}
Expand All @@ -41,7 +40,7 @@ static void logInboundMessage(TcpChannel channel, InboundMessage message) {
try {
String logMessage = format(channel, message, "READ");
logger.trace(logMessage);
} catch (IOException e) {
} catch (Exception e) {
logger.warn("an exception occurred formatting a READ trace message", e);
}
}
Expand All @@ -57,7 +56,7 @@ static void logOutboundMessage(TcpChannel channel, BytesReference message) {
BytesReference withoutHeader = message.slice(HEADER_SIZE, message.length() - HEADER_SIZE);
String logMessage = format(channel, withoutHeader, "WRITE");
logger.trace(logMessage);
} catch (IOException e) {
} catch (Exception e) {
logger.warn("an exception occurred formatting a WRITE trace message", e);
}
}
Expand Down Expand Up @@ -111,55 +110,32 @@ private static String format(TcpChannel channel, BytesReference message, String
return sb.toString();
}

private static String format(TcpChannel channel, InboundMessage message, String event) throws IOException {
private static String format(TcpChannel channel, InboundMessage message, String event) {
final StringBuilder sb = new StringBuilder();
sb.append(channel);

if (message.isPing()) {
sb.append(" [ping]").append(' ').append(event).append(": ").append(6).append('B');
} else {
boolean success = false;
Header header = message.getHeader();
int networkMessageSize = header.getNetworkMessageSize();
int messageLengthWithHeader = HEADER_SIZE + networkMessageSize;
StreamInput streamInput = message.openOrGetStreamInput();
try {
final long requestId = header.getRequestId();
final boolean isRequest = header.isRequest();
final String type = isRequest ? "request" : "response";
final String version = header.getVersion().toString();
sb.append(" [length: ").append(messageLengthWithHeader);
sb.append(", request id: ").append(requestId);
sb.append(", type: ").append(type);
sb.append(", version: ").append(version);
final long requestId = header.getRequestId();
final boolean isRequest = header.isRequest();
final String type = isRequest ? "request" : "response";
final String version = header.getVersion().toString();
sb.append(" [length: ").append(messageLengthWithHeader);
sb.append(", request id: ").append(requestId);
sb.append(", type: ").append(type);
sb.append(", version: ").append(version);

// TODO: Maybe Fix for BWC
if (header.needsToReadVariableHeader() == false && isRequest) {
sb.append(", action: ").append(header.getActionName());
}
sb.append(']');
sb.append(' ').append(event).append(": ").append(messageLengthWithHeader).append('B');
success = true;
} finally {
if (success) {
IOUtils.close(streamInput);
} else {
IOUtils.closeWhileHandlingException(streamInput);
}
// TODO: Maybe Fix for BWC
if (header.needsToReadVariableHeader() == false && isRequest) {
sb.append(", action: ").append(header.getActionName());
}
sb.append(']');
sb.append(' ').append(event).append(": ").append(messageLengthWithHeader).append('B');
}
return sb.toString();
}

private static StreamInput decompressingStream(byte status, StreamInput streamInput) throws IOException {
if (TransportStatus.isCompress(status) && streamInput.available() > 0) {
try {
return CompressorFactory.COMPRESSOR.threadLocalStreamInput(streamInput);
} catch (IllegalArgumentException e) {
throw new IllegalStateException("stream marked as compressed, but is missing deflate header");
}
} else {
return streamInput;
}
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -66,6 +66,34 @@ public void testLoggingHandler() throws IOException {
}
}

public void testLoggingHandlerWithExceptionMessage() {
final String readPattern = ".*\\[length: \\d+"
+ ", request id: \\d+"
+ ", type: request"
+ ", version: .*"
+ " READ: \\d+B";

final MockLog.LoggingExpectation readExpectation = new MockLog.PatternSeenEventExpectation(
"spatial stats request",
TransportLogger.class.getCanonicalName(),
Level.TRACE,
readPattern
);

InboundMessage inboundMessage = new InboundMessage(new Header(
0,
0,
TransportStatus.setRequest((byte) 0),
TransportVersion.current()
), new ActionNotFoundTransportException("cluster:monitor/xpack/spatial/stats"));

try (var mockLog = MockLog.capture(TransportLogger.class)) {
mockLog.addExpectation(readExpectation);
TransportLogger.logInboundMessage(mock(TcpChannel.class), inboundMessage);
mockLog.assertAllExpectationsMatched();
}
}

private BytesReference buildRequest() throws IOException {
BytesRefRecycler recycler = new BytesRefRecycler(PageCacheRecycler.NON_RECYCLING_INSTANCE);
Compression.Scheme compress = randomFrom(Compression.Scheme.DEFLATE, Compression.Scheme.LZ4, null);
Expand Down