Browse Source

Log Slowness on Sending Transport Messages (#67664)

Similar to #62444 but for the outbound path.

This does not detect slowness in individual transport handler logic,
this is done via the inbound handler logging already, but instead
warns if it takes a long time to hand off the message to the relevant
transport thread and then transfer the message over the wire.
This gives some visibility into the stability of the network
connection itself and into the reasons for slow network
responses (if they are the result of slow networking on the sender).
Armin Braun 4 years ago
parent
commit
6d025d3a27

+ 1 - 0
modules/transport-netty4/src/main/java/org/elasticsearch/transport/netty4/Netty4TcpChannel.java

@@ -159,6 +159,7 @@ public class Netty4TcpChannel implements TcpChannel {
         return "Netty4TcpChannel{" +
             "localAddress=" + getLocalAddress() +
             ", remoteAddress=" + channel.remoteAddress() +
+            ", profile=" + profile +
             '}';
     }
 }

+ 1 - 0
plugins/transport-nio/src/main/java/org/elasticsearch/transport/nio/NioTcpChannel.java

@@ -77,6 +77,7 @@ public class NioTcpChannel extends NioSocketChannel implements TcpChannel {
         return "TcpNioSocketChannel{" +
             "localAddress=" + getLocalAddress() +
             ", remoteAddress=" + getRemoteAddress() +
+            ", profile=" + profile +
             '}';
     }
 }

+ 25 - 1
server/src/main/java/org/elasticsearch/transport/OutboundHandler.java

@@ -34,6 +34,7 @@ import org.elasticsearch.common.lease.Releasables;
 import org.elasticsearch.common.network.CloseableChannel;
 import org.elasticsearch.common.transport.NetworkExceptionHelper;
 import org.elasticsearch.common.transport.TransportAddress;
+import org.elasticsearch.common.unit.TimeValue;
 import org.elasticsearch.common.util.BigArrays;
 import org.elasticsearch.common.util.concurrent.ThreadContext;
 import org.elasticsearch.core.internal.io.IOUtils;
@@ -50,6 +51,9 @@ final class OutboundHandler {
     private final StatsTracker statsTracker;
     private final ThreadPool threadPool;
     private final BigArrays bigArrays;
+
+    private volatile long slowLogThresholdMs = Long.MAX_VALUE;
+
     private volatile TransportMessageListener messageListener = TransportMessageListener.NOOP_LISTENER;
 
     OutboundHandler(String nodeName, Version version, StatsTracker statsTracker, ThreadPool threadPool, BigArrays bigArrays) {
@@ -60,6 +64,10 @@ final class OutboundHandler {
         this.bigArrays = bigArrays;
     }
 
+    void setSlowLogThreshold(TimeValue slowLogThreshold) {
+        this.slowLogThresholdMs = slowLogThreshold.getMillis();
+    }
+
     void sendBytes(TcpChannel channel, BytesReference bytes, ActionListener<Void> listener) {
         SendContext sendContext = new SendContext(channel, () -> bytes, listener);
         try {
@@ -125,6 +133,7 @@ final class OutboundHandler {
         BytesReference reference = sendContext.get();
         // stash thread context so that channel event loop is not polluted by thread context
         try (ThreadContext.StoredContext existing = threadPool.getThreadContext().stashContext()) {
+            sendContext.startTime = threadPool.relativeTimeInMillis();
             channel.sendMessage(reference, sendContext);
         } catch (RuntimeException ex) {
             sendContext.onFailure(ex);
@@ -162,6 +171,11 @@ final class OutboundHandler {
         public void close() {
             IOUtils.closeWhileHandlingException(bytesStreamOutput);
         }
+
+        @Override
+        public String toString() {
+            return "MessageSerializer{" + message + "}";
+        }
     }
 
     private class SendContext extends NotifyOnceListener<Void> implements CheckedSupplier<BytesReference, IOException> {
@@ -171,6 +185,7 @@ final class OutboundHandler {
         private final ActionListener<Void> listener;
         private final Releasable optionalReleasable;
         private long messageSize = -1;
+        private long startTime;
 
         private SendContext(TcpChannel channel, CheckedSupplier<BytesReference, IOException> messageSupplier,
                             ActionListener<Void> listener) {
@@ -198,6 +213,15 @@ final class OutboundHandler {
             }
         }
 
+        private void maybeLogSlowMessage() {
+            final long took = threadPool.relativeTimeInMillis() - startTime;
+            final long logThreshold = slowLogThresholdMs;
+            if (logThreshold > 0 && took > logThreshold) {
+                logger.warn("sending transport message [{}] of size [{}] on [{}] took [{}ms] which is above the warn threshold of [{}ms]",
+                        messageSupplier, messageSize, channel, took, logThreshold);
+            }
+        }
+
         @Override
         protected void innerOnResponse(Void v) {
             assert messageSize != -1 : "If onResponse is being called, the message should have been serialized";
@@ -216,7 +240,7 @@ final class OutboundHandler {
         }
 
         private void closeAndCallback(Runnable runnable) {
-            Releasables.close(optionalReleasable, runnable::run);
+            Releasables.close(optionalReleasable, runnable::run, this::maybeLogSlowMessage);
         }
     }
 }

+ 13 - 1
server/src/main/java/org/elasticsearch/transport/OutboundMessage.java

@@ -32,7 +32,7 @@ import java.io.IOException;
 
 abstract class OutboundMessage extends NetworkMessage {
 
-    private final Writeable message;
+    protected final Writeable message;
 
     OutboundMessage(ThreadContext threadContext, Version version, byte status, long requestId, Writeable message) {
         super(threadContext, version, status, requestId);
@@ -130,6 +130,12 @@ abstract class OutboundMessage extends NetworkMessage {
 
             return status;
         }
+
+
+        @Override
+        public String toString() {
+            return "Request{" + action + "}{" + requestId + "}{" + isError() + "}{" + isCompress() + "}{" + isHandshake() + "}";
+        }
     }
 
     static class Response extends OutboundMessage {
@@ -153,6 +159,12 @@ abstract class OutboundMessage extends NetworkMessage {
 
             return status;
         }
+
+        @Override
+        public String toString() {
+            return "Response{" + requestId + "}{" + isError() + "}{" + isCompress() + "}{" + isHandshake() + "}{"
+                    + message.getClass() + "}";
+        }
     }
 
     private static boolean canCompress(Writeable message) {

+ 1 - 0
server/src/main/java/org/elasticsearch/transport/TcpTransport.java

@@ -186,6 +186,7 @@ public abstract class TcpTransport extends AbstractLifecycleComponent implements
     @Override
     public void setSlowLogThreshold(TimeValue slowLogThreshold) {
         inboundHandler.setSlowLogThreshold(slowLogThreshold);
+        outboundHandler.setSlowLogThreshold(slowLogThreshold);
     }
 
     public final class NodeChannels extends CloseableConnection {

+ 41 - 0
server/src/test/java/org/elasticsearch/transport/OutboundHandlerTests.java

@@ -19,9 +19,13 @@
 
 package org.elasticsearch.transport;
 
+import org.apache.logging.log4j.Level;
+import org.apache.logging.log4j.LogManager;
+import org.apache.logging.log4j.Logger;
 import org.elasticsearch.ElasticsearchException;
 import org.elasticsearch.Version;
 import org.elasticsearch.action.ActionListener;
+import org.elasticsearch.action.support.PlainActionFuture;
 import org.elasticsearch.cluster.node.DiscoveryNode;
 import org.elasticsearch.common.breaker.CircuitBreaker;
 import org.elasticsearch.common.breaker.NoopCircuitBreaker;
@@ -30,6 +34,7 @@ import org.elasticsearch.common.bytes.BytesReference;
 import org.elasticsearch.common.bytes.ReleasableBytesReference;
 import org.elasticsearch.common.collect.Tuple;
 import org.elasticsearch.common.io.stream.BytesStreamOutput;
+import org.elasticsearch.common.logging.Loggers;
 import org.elasticsearch.common.transport.TransportAddress;
 import org.elasticsearch.common.unit.TimeValue;
 import org.elasticsearch.common.util.BigArrays;
@@ -37,6 +42,7 @@ import org.elasticsearch.common.util.PageCacheRecycler;
 import org.elasticsearch.common.util.concurrent.ThreadContext;
 import org.elasticsearch.core.internal.io.Streams;
 import org.elasticsearch.test.ESTestCase;
+import org.elasticsearch.test.MockLogAppender;
 import org.elasticsearch.threadpool.TestThreadPool;
 import org.elasticsearch.threadpool.ThreadPool;
 import org.junit.After;
@@ -295,4 +301,39 @@ public class OutboundHandlerTests extends ESTestCase {
 
         assertEquals("header_value", header.getHeaders().v1().get("header"));
     }
+
+    public void testSlowLogOutboundMessage() throws Exception {
+        final MockLogAppender mockAppender = new MockLogAppender();
+        mockAppender.start();
+        mockAppender.addExpectation(
+                new MockLogAppender.SeenEventExpectation(
+                        "expected message",
+                        OutboundHandler.class.getCanonicalName(),
+                        Level.WARN,
+                        "sending transport message "));
+        final Logger outboundHandlerLogger = LogManager.getLogger(OutboundHandler.class);
+        Loggers.addAppender(outboundHandlerLogger, mockAppender);
+        handler.setSlowLogThreshold(TimeValue.timeValueMillis(5L));
+
+        try {
+            final int length = randomIntBetween(1, 100);
+            final PlainActionFuture<Void> f = PlainActionFuture.newFuture();
+            handler.sendBytes(new FakeTcpChannel() {
+                @Override
+                public void sendMessage(BytesReference reference, ActionListener<Void> listener) {
+                    try {
+                        TimeUnit.SECONDS.sleep(1L);
+                        listener.onResponse(null);
+                    } catch (InterruptedException e) {
+                        listener.onFailure(e);
+                    }
+                }
+            }, new BytesArray(randomByteArrayOfLength(length)), f);
+            f.get();
+            mockAppender.assertAllExpectationsMatched();
+        } finally {
+            Loggers.removeAppender(outboundHandlerLogger, mockAppender);
+            mockAppender.stop();
+        }
+    }
 }