Browse Source

Fix `NullPointerException` in transport trace logger (#132243)

When trace-level logging is enabled, a node might disconnect from
cluster due to an NPE that causes the transport connection closed
between the data node and the master node.

InboundMessages printed by `TransportLogger` might throw an NPE in the
format function because `content` might be NULL if another node sends an
abnormal exception response.

Also there's no good reason to close the connection because of a logging
exception, so with this commit we catch all exceptions (rather than just
`IOException`)
Howard 1 month ago
parent
commit
7577951087

+ 5 - 0
docs/changelog/132243.yaml

@@ -0,0 +1,5 @@
+pr: 132243
+summary: Fix `NullPointerException` in transport trace logger
+area: Network
+type: bug
+issues: []

+ 17 - 41
server/src/main/java/org/elasticsearch/transport/TransportLogger.java

@@ -13,7 +13,6 @@ import org.apache.logging.log4j.Logger;
 import org.elasticsearch.TransportVersion;
 import org.elasticsearch.TransportVersions;
 import org.elasticsearch.common.bytes.BytesReference;
-import org.elasticsearch.common.compress.CompressorFactory;
 import org.elasticsearch.common.io.stream.StreamInput;
 import org.elasticsearch.common.util.concurrent.ThreadContext;
 import org.elasticsearch.core.IOUtils;
@@ -30,7 +29,7 @@ public final class TransportLogger {
             try {
                 String logMessage = format(channel, message, "READ");
                 logger.trace(logMessage);
-            } catch (IOException e) {
+            } catch (Exception e) {
                 logger.warn("an exception occurred formatting a READ trace message", e);
             }
         }
@@ -41,7 +40,7 @@ public final class TransportLogger {
             try {
                 String logMessage = format(channel, message, "READ");
                 logger.trace(logMessage);
-            } catch (IOException e) {
+            } catch (Exception e) {
                 logger.warn("an exception occurred formatting a READ trace message", e);
             }
         }
@@ -57,7 +56,7 @@ public final class TransportLogger {
                 BytesReference withoutHeader = message.slice(HEADER_SIZE, message.length() - HEADER_SIZE);
                 String logMessage = format(channel, withoutHeader, "WRITE");
                 logger.trace(logMessage);
-            } catch (IOException e) {
+            } catch (Exception e) {
                 logger.warn("an exception occurred formatting a WRITE trace message", e);
             }
         }
@@ -111,55 +110,32 @@ public final class TransportLogger {
         return sb.toString();
     }
 
-    private static String format(TcpChannel channel, InboundMessage message, String event) throws IOException {
+    private static String format(TcpChannel channel, InboundMessage message, String event) {
         final StringBuilder sb = new StringBuilder();
         sb.append(channel);
 
         if (message.isPing()) {
             sb.append(" [ping]").append(' ').append(event).append(": ").append(6).append('B');
         } else {
-            boolean success = false;
             Header header = message.getHeader();
             int networkMessageSize = header.getNetworkMessageSize();
             int messageLengthWithHeader = HEADER_SIZE + networkMessageSize;
-            StreamInput streamInput = message.openOrGetStreamInput();
-            try {
-                final long requestId = header.getRequestId();
-                final boolean isRequest = header.isRequest();
-                final String type = isRequest ? "request" : "response";
-                final String version = header.getVersion().toString();
-                sb.append(" [length: ").append(messageLengthWithHeader);
-                sb.append(", request id: ").append(requestId);
-                sb.append(", type: ").append(type);
-                sb.append(", version: ").append(version);
+            final long requestId = header.getRequestId();
+            final boolean isRequest = header.isRequest();
+            final String type = isRequest ? "request" : "response";
+            final String version = header.getVersion().toString();
+            sb.append(" [length: ").append(messageLengthWithHeader);
+            sb.append(", request id: ").append(requestId);
+            sb.append(", type: ").append(type);
+            sb.append(", version: ").append(version);
 
-                // TODO: Maybe Fix for BWC
-                if (header.needsToReadVariableHeader() == false && isRequest) {
-                    sb.append(", action: ").append(header.getActionName());
-                }
-                sb.append(']');
-                sb.append(' ').append(event).append(": ").append(messageLengthWithHeader).append('B');
-                success = true;
-            } finally {
-                if (success) {
-                    IOUtils.close(streamInput);
-                } else {
-                    IOUtils.closeWhileHandlingException(streamInput);
-                }
+            // TODO: Maybe Fix for BWC
+            if (header.needsToReadVariableHeader() == false && isRequest) {
+                sb.append(", action: ").append(header.getActionName());
             }
+            sb.append(']');
+            sb.append(' ').append(event).append(": ").append(messageLengthWithHeader).append('B');
         }
         return sb.toString();
     }
-
-    private static StreamInput decompressingStream(byte status, StreamInput streamInput) throws IOException {
-        if (TransportStatus.isCompress(status) && streamInput.available() > 0) {
-            try {
-                return CompressorFactory.COMPRESSOR.threadLocalStreamInput(streamInput);
-            } catch (IllegalArgumentException e) {
-                throw new IllegalStateException("stream marked as compressed, but is missing deflate header");
-            }
-        } else {
-            return streamInput;
-        }
-    }
 }

+ 22 - 0
server/src/test/java/org/elasticsearch/transport/TransportLoggerTests.java

@@ -66,6 +66,28 @@ public class TransportLoggerTests extends ESTestCase {
         }
     }
 
+    public void testLoggingHandlerWithExceptionMessage() {
+        final String readPattern = ".*\\[length: \\d+" + ", request id: \\d+" + ", type: request" + ", version: .*" + " READ: \\d+B";
+
+        final MockLog.LoggingExpectation readExpectation = new MockLog.PatternSeenEventExpectation(
+            "spatial stats request",
+            TransportLogger.class.getCanonicalName(),
+            Level.TRACE,
+            readPattern
+        );
+
+        InboundMessage inboundMessage = new InboundMessage(
+            new Header(0, 0, TransportStatus.setRequest((byte) 0), TransportVersion.current()),
+            new ActionNotFoundTransportException("cluster:monitor/xpack/spatial/stats")
+        );
+
+        try (var mockLog = MockLog.capture(TransportLogger.class)) {
+            mockLog.addExpectation(readExpectation);
+            TransportLogger.logInboundMessage(mock(TcpChannel.class), inboundMessage);
+            mockLog.assertAllExpectationsMatched();
+        }
+    }
+
     private BytesReference buildRequest() throws IOException {
         BytesRefRecycler recycler = new BytesRefRecycler(PageCacheRecycler.NON_RECYCLING_INSTANCE);
         Compression.Scheme compress = randomFrom(Compression.Scheme.DEFLATE, Compression.Scheme.LZ4, null);