Browse Source

Track histogram of transport handling times (#80581)

Adds to the transport node stats a record of the distribution of the
times for which a transport thread was handling a message, represented
as a histogram.

Closes #80428
David Turner 3 years ago
parent
commit
54e0370b3e

+ 48 - 0
docs/reference/cluster/nodes-stats.asciidoc

@@ -1899,6 +1899,54 @@ Size of TX packets sent by the node during internal cluster communication.
 (integer)
 Size, in bytes, of TX packets sent by the node during internal cluster
 communication.
+
+`inbound_handling_time_histogram`::
+(array)
+The distribution of the time spent handling each inbound message on a transport
+thread, represented as a histogram.
++
+.Properties of `inbound_handling_time_histogram`
+[%collapsible]
+=======
+`ge_millis`::
+(integer)
+The inclusive lower bound of the bucket in milliseconds. Omitted on the first
+bucket since this bucket has no lower bound.
+
+`lt_millis`::
+(integer)
+The exclusive upper bound of the bucket in milliseconds. Omitted on the last
+bucket since this bucket has no upper bound.
+
+`count`::
+(integer)
+The number of times a transport thread took a period of time within the bounds
+of this bucket to handle an inbound message.
+=======
+
+`outbound_handling_time_histogram`::
+(array)
+The distribution of the time spent sending each outbound transport message on a
+transport thread, represented as a histogram.
++
+.Properties of `outbound_handling_time_histogram`
+[%collapsible]
+=======
+`ge_millis`::
+(integer)
+The inclusive lower bound of the bucket in milliseconds. Omitted on the first
+bucket since this bucket has no lower bound.
+
+`lt_millis`::
+(integer)
+The exclusive upper bound of the bucket in milliseconds. Omitted on the last
+bucket since this bucket has no upper bound.
+
+`count`::
+(integer)
+The number of times a transport thread took a period of time within the bounds
+of this bucket to send a transport message.
+=======
 ======
 
 [[cluster-nodes-stats-api-response-body-http]]

+ 45 - 0
rest-api-spec/src/yamlRestTest/resources/rest-api-spec/test/nodes.stats/60_transport_stats.yml

@@ -20,3 +20,48 @@
   - gte: { nodes.$node_id.transport.tx_count: 0 }
   - gte: { nodes.$node_id.transport.rx_size_in_bytes: 0 }
   - gte: { nodes.$node_id.transport.tx_size_in_bytes: 0 }
+
+---
+"Transport handling time histogram":
+  - skip:
+      version: " - 8.0.99"
+      reason: "handling_time_histograms were added in 8.1"
+      features: [arbitrary_key]
+
+  - do:
+      nodes.info: {}
+  - set:
+      nodes._arbitrary_key_: node_id
+
+  - do:
+      nodes.stats:
+        metric: [ transport ]
+
+  - length: { nodes.$node_id.transport.inbound_handling_time_histogram: 18 }
+
+  - gte:    { nodes.$node_id.transport.inbound_handling_time_histogram.0.count: 0 }
+  - is_false: nodes.$node_id.transport.inbound_handling_time_histogram.0.ge_millis
+  - match:  { nodes.$node_id.transport.inbound_handling_time_histogram.0.lt_millis: 1 }
+
+  - gte:    { nodes.$node_id.transport.inbound_handling_time_histogram.1.count: 0 }
+  - match:  { nodes.$node_id.transport.inbound_handling_time_histogram.1.ge_millis: 1 }
+  - match:  { nodes.$node_id.transport.inbound_handling_time_histogram.1.lt_millis: 2 }
+
+  - gte:    { nodes.$node_id.transport.inbound_handling_time_histogram.17.count: 0 }
+  - match:  { nodes.$node_id.transport.inbound_handling_time_histogram.17.ge_millis: 65536 }
+  - is_false: nodes.$node_id.transport.inbound_handling_time_histogram.17.lt_millis
+
+
+  - length: { nodes.$node_id.transport.outbound_handling_time_histogram: 18 }
+
+  - gte:    { nodes.$node_id.transport.outbound_handling_time_histogram.0.count: 0 }
+  - is_false: nodes.$node_id.transport.outbound_handling_time_histogram.0.ge_millis
+  - match:  { nodes.$node_id.transport.outbound_handling_time_histogram.0.lt_millis: 1 }
+
+  - gte:    { nodes.$node_id.transport.outbound_handling_time_histogram.1.count: 0 }
+  - match:  { nodes.$node_id.transport.outbound_handling_time_histogram.1.ge_millis: 1 }
+  - match:  { nodes.$node_id.transport.outbound_handling_time_histogram.1.lt_millis: 2 }
+
+  - gte:    { nodes.$node_id.transport.outbound_handling_time_histogram.17.count: 0 }
+  - match:  { nodes.$node_id.transport.outbound_handling_time_histogram.17.ge_millis: 65536 }
+  - is_false: nodes.$node_id.transport.outbound_handling_time_histogram.17.lt_millis

+ 65 - 0
server/src/main/java/org/elasticsearch/common/network/HandlingTimeTracker.java

@@ -0,0 +1,65 @@
+/*
+ * Copyright Elasticsearch B.V. and/or licensed to Elasticsearch B.V. under one
+ * or more contributor license agreements. Licensed under the Elastic License
+ * 2.0 and the Server Side Public License, v 1; you may not use this file except
+ * in compliance with, at your election, the Elastic License 2.0 or the Server
+ * Side Public License, v 1.
+ */
+
+package org.elasticsearch.common.network;
+
+import java.util.concurrent.atomic.LongAdder;
+
+/**
+ * Tracks how long message handling takes on a transport thread as a histogram with fixed buckets.
+ */
+public class HandlingTimeTracker {
+
+    public static int[] getBucketUpperBounds() {
+        int[] bounds = new int[17];
+        for (int i = 0; i < bounds.length; i++) {
+            bounds[i] = 1 << i;
+        }
+        return bounds;
+    }
+
+    private static int getBucket(long handlingTimeMillis) {
+        if (handlingTimeMillis <= 0) {
+            return 0;
+        } else if (LAST_BUCKET_LOWER_BOUND <= handlingTimeMillis) {
+            return BUCKET_COUNT - 1;
+        } else {
+            return Long.SIZE - Long.numberOfLeadingZeros(handlingTimeMillis);
+        }
+    }
+
+    public static final int BUCKET_COUNT = getBucketUpperBounds().length + 1;
+
+    private static final long LAST_BUCKET_LOWER_BOUND = getBucketUpperBounds()[BUCKET_COUNT - 2];
+
+    private final LongAdder[] buckets;
+
+    public HandlingTimeTracker() {
+        buckets = new LongAdder[BUCKET_COUNT];
+        for (int i = 0; i < BUCKET_COUNT; i++) {
+            buckets[i] = new LongAdder();
+        }
+    }
+
+    public void addHandlingTime(long handlingTimeMillis) {
+        buckets[getBucket(handlingTimeMillis)].increment();
+    }
+
+    /**
+     * @return An array of frequencies of handling times in buckets with upper bounds as returned by {@link #getBucketUpperBounds()}, plus
+     *         an extra bucket for handling times longer than the longest upper bound.
+     */
+    public long[] getHistogram() {
+        final long[] histogram = new long[BUCKET_COUNT];
+        for (int i = 0; i < BUCKET_COUNT; i++) {
+            histogram[i] = buckets[i].longValue();
+        }
+        return histogram;
+    }
+
+}

+ 5 - 0
server/src/main/java/org/elasticsearch/common/network/NetworkService.java

@@ -90,11 +90,16 @@ public final class NetworkService {
     }
 
     private final List<CustomNameResolver> customNameResolvers;
+    private final HandlingTimeTracker handlingTimeTracker = new HandlingTimeTracker();
 
     public NetworkService(List<CustomNameResolver> customNameResolvers) {
         this.customNameResolvers = Objects.requireNonNull(customNameResolvers, "customNameResolvers must be non null");
     }
 
+    public HandlingTimeTracker getHandlingTimeTracker() {
+        return handlingTimeTracker;
+    }
+
     /**
      * Resolves {@code bindHosts} to a list of internet addresses. The list will
      * not contain duplicate addresses.

+ 3 - 2
server/src/main/java/org/elasticsearch/http/AbstractHttpServerTransport.java

@@ -355,11 +355,12 @@ public abstract class AbstractHttpServerTransport extends AbstractLifecycleCompo
      */
     public void incomingRequest(final HttpRequest httpRequest, final HttpChannel httpChannel) {
         httpClientStatsTracker.updateClientStats(httpRequest, httpChannel);
-        final long startTime = threadPool.relativeTimeInMillis();
+        final long startTime = threadPool.rawRelativeTimeInMillis();
         try {
             handleIncomingRequest(httpRequest, httpChannel, httpRequest.getInboundException());
         } finally {
-            final long took = threadPool.relativeTimeInMillis() - startTime;
+            final long took = threadPool.rawRelativeTimeInMillis() - startTime;
+            networkService.getHandlingTimeTracker().addHandlingTime(took);
             final long logThreshold = slowLogThresholdMs;
             if (logThreshold > 0 && took > logThreshold) {
                 logger.warn(

+ 8 - 3
server/src/main/java/org/elasticsearch/transport/InboundHandler.java

@@ -17,6 +17,7 @@ import org.elasticsearch.common.io.stream.ByteBufferStreamInput;
 import org.elasticsearch.common.io.stream.NamedWriteableAwareStreamInput;
 import org.elasticsearch.common.io.stream.NamedWriteableRegistry;
 import org.elasticsearch.common.io.stream.StreamInput;
+import org.elasticsearch.common.network.HandlingTimeTracker;
 import org.elasticsearch.common.util.concurrent.AbstractRunnable;
 import org.elasticsearch.common.util.concurrent.ThreadContext;
 import org.elasticsearch.core.TimeValue;
@@ -40,6 +41,7 @@ public class InboundHandler {
     private final TransportHandshaker handshaker;
     private final TransportKeepAlive keepAlive;
     private final Transport.ResponseHandlers responseHandlers;
+    private final HandlingTimeTracker handlingTimeTracker;
     private final Transport.RequestHandlers requestHandlers;
 
     private volatile TransportMessageListener messageListener = TransportMessageListener.NOOP_LISTENER;
@@ -53,7 +55,8 @@ public class InboundHandler {
         TransportHandshaker handshaker,
         TransportKeepAlive keepAlive,
         Transport.RequestHandlers requestHandlers,
-        Transport.ResponseHandlers responseHandlers
+        Transport.ResponseHandlers responseHandlers,
+        HandlingTimeTracker handlingTimeTracker
     ) {
         this.threadPool = threadPool;
         this.outboundHandler = outboundHandler;
@@ -62,6 +65,7 @@ public class InboundHandler {
         this.keepAlive = keepAlive;
         this.requestHandlers = requestHandlers;
         this.responseHandlers = responseHandlers;
+        this.handlingTimeTracker = handlingTimeTracker;
     }
 
     void setMessageListener(TransportMessageListener listener) {
@@ -77,7 +81,7 @@ public class InboundHandler {
     }
 
     void inboundMessage(TcpChannel channel, InboundMessage message) throws Exception {
-        final long startTime = threadPool.relativeTimeInMillis();
+        final long startTime = threadPool.rawRelativeTimeInMillis();
         channel.getChannelStats().markAccessed(startTime);
         TransportLogger.logInboundMessage(channel, message);
 
@@ -155,7 +159,8 @@ public class InboundHandler {
                 }
             }
         } finally {
-            final long took = threadPool.relativeTimeInMillis() - startTime;
+            final long took = threadPool.rawRelativeTimeInMillis() - startTime;
+            handlingTimeTracker.addHandlingTime(took);
             final long logThreshold = slowLogThresholdMs;
             if (logThreshold > 0 && took > logThreshold) {
                 if (isRequest) {

+ 14 - 3
server/src/main/java/org/elasticsearch/transport/OutboundHandler.java

@@ -19,6 +19,7 @@ import org.elasticsearch.cluster.node.DiscoveryNode;
 import org.elasticsearch.common.bytes.BytesReference;
 import org.elasticsearch.common.io.stream.RecyclerBytesStreamOutput;
 import org.elasticsearch.common.network.CloseableChannel;
+import org.elasticsearch.common.network.HandlingTimeTracker;
 import org.elasticsearch.common.recycler.Recycler;
 import org.elasticsearch.common.transport.NetworkExceptionHelper;
 import org.elasticsearch.common.util.concurrent.ThreadContext;
@@ -37,17 +38,26 @@ final class OutboundHandler {
     private final StatsTracker statsTracker;
     private final ThreadPool threadPool;
     private final Recycler<BytesRef> recycler;
+    private final HandlingTimeTracker handlingTimeTracker;
 
     private volatile long slowLogThresholdMs = Long.MAX_VALUE;
 
     private volatile TransportMessageListener messageListener = TransportMessageListener.NOOP_LISTENER;
 
-    OutboundHandler(String nodeName, Version version, StatsTracker statsTracker, ThreadPool threadPool, Recycler<BytesRef> recycler) {
+    OutboundHandler(
+        String nodeName,
+        Version version,
+        StatsTracker statsTracker,
+        ThreadPool threadPool,
+        Recycler<BytesRef> recycler,
+        HandlingTimeTracker handlingTimeTracker
+    ) {
         this.nodeName = nodeName;
         this.version = version;
         this.statsTracker = statsTracker;
         this.threadPool = threadPool;
         this.recycler = recycler;
+        this.handlingTimeTracker = handlingTimeTracker;
     }
 
     void setSlowLogThreshold(TimeValue slowLogThreshold) {
@@ -168,7 +178,7 @@ final class OutboundHandler {
         @Nullable OutboundMessage message,
         ActionListener<Void> listener
     ) {
-        final long startTime = threadPool.relativeTimeInMillis();
+        final long startTime = threadPool.rawRelativeTimeInMillis();
         channel.getChannelStats().markAccessed(startTime);
         final long messageSize = reference.length();
         TransportLogger.logOutboundMessage(channel, reference);
@@ -196,7 +206,8 @@ final class OutboundHandler {
                 private void maybeLogSlowMessage(boolean success) {
                     final long logThreshold = slowLogThresholdMs;
                     if (logThreshold > 0) {
-                        final long took = threadPool.relativeTimeInMillis() - startTime;
+                        final long took = threadPool.rawRelativeTimeInMillis() - startTime;
+                        handlingTimeTracker.addHandlingTime(took);
                         if (took > logThreshold) {
                             logger.warn(
                                 "sending transport message [{}] of size [{}] on [{}] took [{}ms] which is above the warn "

+ 8 - 3
server/src/main/java/org/elasticsearch/transport/TcpTransport.java

@@ -29,6 +29,7 @@ import org.elasticsearch.common.io.stream.NamedWriteableRegistry;
 import org.elasticsearch.common.io.stream.RecyclerBytesStreamOutput;
 import org.elasticsearch.common.io.stream.StreamInput;
 import org.elasticsearch.common.network.CloseableChannel;
+import org.elasticsearch.common.network.HandlingTimeTracker;
 import org.elasticsearch.common.network.NetworkAddress;
 import org.elasticsearch.common.network.NetworkService;
 import org.elasticsearch.common.network.NetworkUtils;
@@ -116,6 +117,7 @@ public abstract class TcpTransport extends AbstractLifecycleComponent implements
 
     private final TransportHandshaker handshaker;
     private final TransportKeepAlive keepAlive;
+    private final HandlingTimeTracker outboundHandlingTimeTracker = new HandlingTimeTracker();
     private final OutboundHandler outboundHandler;
     private final InboundHandler inboundHandler;
     private final ResponseHandlers responseHandlers = new ResponseHandlers();
@@ -141,7 +143,7 @@ public abstract class TcpTransport extends AbstractLifecycleComponent implements
         String nodeName = Node.NODE_NAME_SETTING.get(settings);
 
         this.recycler = createRecycler(settings, pageCacheRecycler);
-        this.outboundHandler = new OutboundHandler(nodeName, version, statsTracker, threadPool, recycler);
+        this.outboundHandler = new OutboundHandler(nodeName, version, statsTracker, threadPool, recycler, outboundHandlingTimeTracker);
         this.handshaker = new TransportHandshaker(
             version,
             threadPool,
@@ -165,7 +167,8 @@ public abstract class TcpTransport extends AbstractLifecycleComponent implements
             handshaker,
             keepAlive,
             requestHandlers,
-            responseHandlers
+            responseHandlers,
+            networkService.getHandlingTimeTracker()
         );
     }
 
@@ -918,7 +921,9 @@ public abstract class TcpTransport extends AbstractLifecycleComponent implements
             messagesReceived,
             bytesRead,
             messagesSent,
-            bytesWritten
+            bytesWritten,
+            networkService.getHandlingTimeTracker().getHistogram(),
+            outboundHandlingTimeTracker.getHistogram()
         );
     }
 

+ 89 - 2
server/src/main/java/org/elasticsearch/transport/TransportStats.java

@@ -8,14 +8,17 @@
 
 package org.elasticsearch.transport;
 
+import org.elasticsearch.Version;
 import org.elasticsearch.common.io.stream.StreamInput;
 import org.elasticsearch.common.io.stream.StreamOutput;
 import org.elasticsearch.common.io.stream.Writeable;
+import org.elasticsearch.common.network.HandlingTimeTracker;
 import org.elasticsearch.common.unit.ByteSizeValue;
 import org.elasticsearch.xcontent.ToXContentFragment;
 import org.elasticsearch.xcontent.XContentBuilder;
 
 import java.io.IOException;
+import java.util.Arrays;
 
 public class TransportStats implements Writeable, ToXContentFragment {
 
@@ -25,14 +28,28 @@ public class TransportStats implements Writeable, ToXContentFragment {
     private final long rxSize;
     private final long txCount;
     private final long txSize;
-
-    public TransportStats(long serverOpen, long totalOutboundConnections, long rxCount, long rxSize, long txCount, long txSize) {
+    private final long[] inboundHandlingTimeBucketFrequencies;
+    private final long[] outboundHandlingTimeBucketFrequencies;
+
+    public TransportStats(
+        long serverOpen,
+        long totalOutboundConnections,
+        long rxCount,
+        long rxSize,
+        long txCount,
+        long txSize,
+        long[] inboundHandlingTimeBucketFrequencies,
+        long[] outboundHandlingTimeBucketFrequencies
+    ) {
         this.serverOpen = serverOpen;
         this.totalOutboundConnections = totalOutboundConnections;
         this.rxCount = rxCount;
         this.rxSize = rxSize;
         this.txCount = txCount;
         this.txSize = txSize;
+        this.inboundHandlingTimeBucketFrequencies = inboundHandlingTimeBucketFrequencies;
+        this.outboundHandlingTimeBucketFrequencies = outboundHandlingTimeBucketFrequencies;
+        assert assertHistogramsConsistent();
     }
 
     public TransportStats(StreamInput in) throws IOException {
@@ -42,6 +59,20 @@ public class TransportStats implements Writeable, ToXContentFragment {
         rxSize = in.readVLong();
         txCount = in.readVLong();
         txSize = in.readVLong();
+        if (in.getVersion().onOrAfter(Version.V_8_1_0) && in.readBoolean()) {
+            inboundHandlingTimeBucketFrequencies = new long[HandlingTimeTracker.BUCKET_COUNT];
+            for (int i = 0; i < inboundHandlingTimeBucketFrequencies.length; i++) {
+                inboundHandlingTimeBucketFrequencies[i] = in.readVLong();
+            }
+            outboundHandlingTimeBucketFrequencies = new long[HandlingTimeTracker.BUCKET_COUNT];
+            for (int i = 0; i < inboundHandlingTimeBucketFrequencies.length; i++) {
+                outboundHandlingTimeBucketFrequencies[i] = in.readVLong();
+            }
+        } else {
+            inboundHandlingTimeBucketFrequencies = new long[0];
+            outboundHandlingTimeBucketFrequencies = new long[0];
+        }
+        assert assertHistogramsConsistent();
     }
 
     @Override
@@ -52,6 +83,16 @@ public class TransportStats implements Writeable, ToXContentFragment {
         out.writeVLong(rxSize);
         out.writeVLong(txCount);
         out.writeVLong(txSize);
+        if (out.getVersion().onOrAfter(Version.V_8_1_0)) {
+            assert (inboundHandlingTimeBucketFrequencies.length > 0) == (outboundHandlingTimeBucketFrequencies.length > 0);
+            out.writeBoolean(inboundHandlingTimeBucketFrequencies.length > 0);
+            for (long handlingTimeBucketFrequency : inboundHandlingTimeBucketFrequencies) {
+                out.writeVLong(handlingTimeBucketFrequency);
+            }
+            for (long handlingTimeBucketFrequency : outboundHandlingTimeBucketFrequencies) {
+                out.writeVLong(handlingTimeBucketFrequency);
+            }
+        }
     }
 
     public long serverOpen() {
@@ -94,6 +135,25 @@ public class TransportStats implements Writeable, ToXContentFragment {
         return txSize();
     }
 
+    public long[] getInboundHandlingTimeBucketFrequencies() {
+        return Arrays.copyOf(inboundHandlingTimeBucketFrequencies, inboundHandlingTimeBucketFrequencies.length);
+    }
+
+    public long[] getOutboundHandlingTimeBucketFrequencies() {
+        return Arrays.copyOf(outboundHandlingTimeBucketFrequencies, outboundHandlingTimeBucketFrequencies.length);
+    }
+
+    private boolean assertHistogramsConsistent() {
+        assert inboundHandlingTimeBucketFrequencies.length == outboundHandlingTimeBucketFrequencies.length;
+        if (inboundHandlingTimeBucketFrequencies.length == 0) {
+            // Stats came from before v8.1
+            assert Version.CURRENT.major == Version.V_8_0_0.major;
+        } else {
+            assert inboundHandlingTimeBucketFrequencies.length == HandlingTimeTracker.BUCKET_COUNT;
+        }
+        return true;
+    }
+
     @Override
     public XContentBuilder toXContent(XContentBuilder builder, Params params) throws IOException {
         builder.startObject(Fields.TRANSPORT);
@@ -103,10 +163,35 @@ public class TransportStats implements Writeable, ToXContentFragment {
         builder.humanReadableField(Fields.RX_SIZE_IN_BYTES, Fields.RX_SIZE, new ByteSizeValue(rxSize));
         builder.field(Fields.TX_COUNT, txCount);
         builder.humanReadableField(Fields.TX_SIZE_IN_BYTES, Fields.TX_SIZE, new ByteSizeValue(txSize));
+        if (inboundHandlingTimeBucketFrequencies.length > 0) {
+            histogramToXContent(builder, inboundHandlingTimeBucketFrequencies, Fields.INBOUND_HANDLING_TIME_HISTOGRAM);
+            histogramToXContent(builder, outboundHandlingTimeBucketFrequencies, Fields.OUTBOUND_HANDLING_TIME_HISTOGRAM);
+        } else {
+            // Stats came from before v8.1
+            assert Version.CURRENT.major == Version.V_8_0_0.major;
+        }
         builder.endObject();
         return builder;
     }
 
+    private void histogramToXContent(XContentBuilder builder, long[] bucketFrequencies, String fieldName) throws IOException {
+        final int[] bucketBounds = HandlingTimeTracker.getBucketUpperBounds();
+        assert bucketFrequencies.length == bucketBounds.length + 1;
+        builder.startArray(fieldName);
+        for (int i = 0; i < bucketFrequencies.length; i++) {
+            builder.startObject();
+            if (i > 0 && i <= bucketBounds.length) {
+                builder.field("ge_millis", bucketBounds[i - 1]);
+            }
+            if (i < bucketBounds.length) {
+                builder.field("lt_millis", bucketBounds[i]);
+            }
+            builder.field("count", bucketFrequencies[i]);
+            builder.endObject();
+        }
+        builder.endArray();
+    }
+
     static final class Fields {
         static final String TRANSPORT = "transport";
         static final String SERVER_OPEN = "server_open";
@@ -117,5 +202,7 @@ public class TransportStats implements Writeable, ToXContentFragment {
         static final String TX_COUNT = "tx_count";
         static final String TX_SIZE = "tx_size";
         static final String TX_SIZE_IN_BYTES = "tx_size_in_bytes";
+        static final String INBOUND_HANDLING_TIME_HISTOGRAM = "inbound_handling_time_histogram";
+        static final String OUTBOUND_HANDLING_TIME_HISTOGRAM = "outbound_handling_time_histogram";
     }
 }

+ 13 - 1
server/src/test/java/org/elasticsearch/action/admin/cluster/node/stats/NodeStatsTests.java

@@ -17,6 +17,7 @@ import org.elasticsearch.cluster.service.ClusterApplierRecordingService.Stats.Re
 import org.elasticsearch.cluster.service.ClusterStateUpdateStats;
 import org.elasticsearch.common.io.stream.BytesStreamOutput;
 import org.elasticsearch.common.io.stream.StreamInput;
+import org.elasticsearch.common.network.HandlingTimeTracker;
 import org.elasticsearch.core.Tuple;
 import org.elasticsearch.discovery.DiscoveryStats;
 import org.elasticsearch.http.HttpStats;
@@ -47,6 +48,7 @@ import java.util.Iterator;
 import java.util.List;
 import java.util.Map;
 import java.util.stream.Collectors;
+import java.util.stream.IntStream;
 
 import static java.util.Collections.emptyMap;
 import static java.util.Collections.emptySet;
@@ -238,6 +240,14 @@ public class NodeStatsTests extends ESTestCase {
                     assertEquals(nodeStats.getTransport().getServerOpen(), deserializedNodeStats.getTransport().getServerOpen());
                     assertEquals(nodeStats.getTransport().getTxCount(), deserializedNodeStats.getTransport().getTxCount());
                     assertEquals(nodeStats.getTransport().getTxSize(), deserializedNodeStats.getTransport().getTxSize());
+                    assertArrayEquals(
+                        nodeStats.getTransport().getInboundHandlingTimeBucketFrequencies(),
+                        deserializedNodeStats.getTransport().getInboundHandlingTimeBucketFrequencies()
+                    );
+                    assertArrayEquals(
+                        nodeStats.getTransport().getOutboundHandlingTimeBucketFrequencies(),
+                        deserializedNodeStats.getTransport().getOutboundHandlingTimeBucketFrequencies()
+                    );
                 }
                 if (nodeStats.getHttp() == null) {
                     assertNull(deserializedNodeStats.getHttp());
@@ -672,7 +682,9 @@ public class NodeStatsTests extends ESTestCase {
                 randomNonNegativeLong(),
                 randomNonNegativeLong(),
                 randomNonNegativeLong(),
-                randomNonNegativeLong()
+                randomNonNegativeLong(),
+                IntStream.range(0, HandlingTimeTracker.BUCKET_COUNT).mapToLong(i -> randomNonNegativeLong()).toArray(),
+                IntStream.range(0, HandlingTimeTracker.BUCKET_COUNT).mapToLong(i -> randomNonNegativeLong()).toArray()
             )
             : null;
         HttpStats httpStats = null;

+ 83 - 0
server/src/test/java/org/elasticsearch/common/network/HandlingTimeTrackerTests.java

@@ -0,0 +1,83 @@
+/*
+ * Copyright Elasticsearch B.V. and/or licensed to Elasticsearch B.V. under one
+ * or more contributor license agreements. Licensed under the Elastic License
+ * 2.0 and the Server Side Public License, v 1; you may not use this file except
+ * in compliance with, at your election, the Elastic License 2.0 or the Server
+ * Side Public License, v 1.
+ */
+
+package org.elasticsearch.common.network;
+
+import org.elasticsearch.test.ESTestCase;
+
+import static org.hamcrest.Matchers.greaterThan;
+
+public class HandlingTimeTrackerTests extends ESTestCase {
+
+    public void testHistogram() {
+        final HandlingTimeTracker handlingTimeTracker = new HandlingTimeTracker();
+
+        assertArrayEquals(new long[] { 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0 }, handlingTimeTracker.getHistogram());
+
+        handlingTimeTracker.addHandlingTime(0L);
+        assertArrayEquals(new long[] { 1, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0 }, handlingTimeTracker.getHistogram());
+
+        handlingTimeTracker.addHandlingTime(1L);
+        assertArrayEquals(new long[] { 1, 1, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0 }, handlingTimeTracker.getHistogram());
+
+        handlingTimeTracker.addHandlingTime(2L);
+        assertArrayEquals(new long[] { 1, 1, 1, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0 }, handlingTimeTracker.getHistogram());
+
+        handlingTimeTracker.addHandlingTime(3L);
+        assertArrayEquals(new long[] { 1, 1, 2, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0 }, handlingTimeTracker.getHistogram());
+
+        handlingTimeTracker.addHandlingTime(4L);
+        assertArrayEquals(new long[] { 1, 1, 2, 1, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0 }, handlingTimeTracker.getHistogram());
+
+        handlingTimeTracker.addHandlingTime(127L);
+        assertArrayEquals(new long[] { 1, 1, 2, 1, 0, 0, 0, 1, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0 }, handlingTimeTracker.getHistogram());
+
+        handlingTimeTracker.addHandlingTime(128L);
+        assertArrayEquals(new long[] { 1, 1, 2, 1, 0, 0, 0, 1, 1, 0, 0, 0, 0, 0, 0, 0, 0, 0 }, handlingTimeTracker.getHistogram());
+
+        handlingTimeTracker.addHandlingTime(65535L);
+        assertArrayEquals(new long[] { 1, 1, 2, 1, 0, 0, 0, 1, 1, 0, 0, 0, 0, 0, 0, 0, 1, 0 }, handlingTimeTracker.getHistogram());
+
+        handlingTimeTracker.addHandlingTime(65536L);
+        assertArrayEquals(new long[] { 1, 1, 2, 1, 0, 0, 0, 1, 1, 0, 0, 0, 0, 0, 0, 0, 1, 1 }, handlingTimeTracker.getHistogram());
+
+        handlingTimeTracker.addHandlingTime(Long.MAX_VALUE);
+        assertArrayEquals(new long[] { 1, 1, 2, 1, 0, 0, 0, 1, 1, 0, 0, 0, 0, 0, 0, 0, 1, 2 }, handlingTimeTracker.getHistogram());
+
+        handlingTimeTracker.addHandlingTime(randomLongBetween(65536L, Long.MAX_VALUE));
+        assertArrayEquals(new long[] { 1, 1, 2, 1, 0, 0, 0, 1, 1, 0, 0, 0, 0, 0, 0, 0, 1, 3 }, handlingTimeTracker.getHistogram());
+
+        handlingTimeTracker.addHandlingTime(randomLongBetween(Long.MIN_VALUE, 0L));
+        assertArrayEquals(new long[] { 2, 1, 2, 1, 0, 0, 0, 1, 1, 0, 0, 0, 0, 0, 0, 0, 1, 3 }, handlingTimeTracker.getHistogram());
+    }
+
+    public void testHistogramRandom() {
+        final int[] upperBounds = HandlingTimeTracker.getBucketUpperBounds();
+        final long[] expectedCounts = new long[upperBounds.length + 1];
+        final HandlingTimeTracker handlingTimeTracker = new HandlingTimeTracker();
+        for (int i = between(0, 1000); i > 0; i--) {
+            final int bucket = between(0, expectedCounts.length - 1);
+            expectedCounts[bucket] += 1;
+
+            final int lowerBound = bucket == 0 ? 0 : upperBounds[bucket - 1];
+            final int upperBound = bucket == upperBounds.length ? randomBoolean() ? 100000 : Integer.MAX_VALUE : upperBounds[bucket] - 1;
+            handlingTimeTracker.addHandlingTime(between(lowerBound, upperBound));
+        }
+
+        assertArrayEquals(expectedCounts, handlingTimeTracker.getHistogram());
+    }
+
+    public void testBoundsConsistency() {
+        final int[] upperBounds = HandlingTimeTracker.getBucketUpperBounds();
+        assertThat(upperBounds[0], greaterThan(0));
+        for (int i = 1; i < upperBounds.length; i++) {
+            assertThat(upperBounds[i], greaterThan(upperBounds[i - 1]));
+        }
+    }
+
+}

+ 5 - 2
server/src/test/java/org/elasticsearch/transport/InboundHandlerTests.java

@@ -22,6 +22,7 @@ import org.elasticsearch.common.io.stream.NamedWriteableRegistry;
 import org.elasticsearch.common.io.stream.RecyclerBytesStreamOutput;
 import org.elasticsearch.common.io.stream.StreamInput;
 import org.elasticsearch.common.logging.Loggers;
+import org.elasticsearch.common.network.HandlingTimeTracker;
 import org.elasticsearch.common.settings.Settings;
 import org.elasticsearch.common.util.PageCacheRecycler;
 import org.elasticsearch.core.TimeValue;
@@ -69,7 +70,8 @@ public class InboundHandlerTests extends ESTestCase {
             version,
             new StatsTracker(),
             threadPool,
-            new BytesRefRecycler(PageCacheRecycler.NON_RECYCLING_INSTANCE)
+            new BytesRefRecycler(PageCacheRecycler.NON_RECYCLING_INSTANCE),
+            new HandlingTimeTracker()
         );
         requestHandlers = new Transport.RequestHandlers();
         responseHandlers = new Transport.ResponseHandlers();
@@ -80,7 +82,8 @@ public class InboundHandlerTests extends ESTestCase {
             handshaker,
             keepAlive,
             requestHandlers,
-            responseHandlers
+            responseHandlers,
+            new HandlingTimeTracker()
         );
     }
 

+ 2 - 1
server/src/test/java/org/elasticsearch/transport/OutboundHandlerTests.java

@@ -23,6 +23,7 @@ import org.elasticsearch.common.bytes.BytesReference;
 import org.elasticsearch.common.bytes.ReleasableBytesReference;
 import org.elasticsearch.common.io.stream.BytesStreamOutput;
 import org.elasticsearch.common.logging.Loggers;
+import org.elasticsearch.common.network.HandlingTimeTracker;
 import org.elasticsearch.common.network.NetworkAddress;
 import org.elasticsearch.common.transport.TransportAddress;
 import org.elasticsearch.common.util.PageCacheRecycler;
@@ -71,7 +72,7 @@ public class OutboundHandlerTests extends ESTestCase {
         node = new DiscoveryNode("", transportAddress, Version.CURRENT);
         StatsTracker statsTracker = new StatsTracker();
         compressionScheme = randomFrom(Compression.Scheme.DEFLATE, Compression.Scheme.LZ4);
-        handler = new OutboundHandler("node", Version.CURRENT, statsTracker, threadPool, recycler);
+        handler = new OutboundHandler("node", Version.CURRENT, statsTracker, threadPool, recycler, new HandlingTimeTracker());
 
         final LongSupplier millisSupplier = () -> TimeValue.nsecToMSec(System.nanoTime());
         final InboundDecoder decoder = new InboundDecoder(Version.CURRENT, this.recycler);

+ 3 - 1
server/src/test/java/org/elasticsearch/transport/TcpTransportTests.java

@@ -17,6 +17,7 @@ import org.elasticsearch.cluster.node.DiscoveryNode;
 import org.elasticsearch.common.component.Lifecycle;
 import org.elasticsearch.common.io.stream.BytesStreamOutput;
 import org.elasticsearch.common.logging.Loggers;
+import org.elasticsearch.common.network.HandlingTimeTracker;
 import org.elasticsearch.common.network.NetworkService;
 import org.elasticsearch.common.network.NetworkUtils;
 import org.elasticsearch.common.settings.Settings;
@@ -541,7 +542,8 @@ public class TcpTransportTests extends ESTestCase {
                     Version.CURRENT,
                     new StatsTracker(),
                     testThreadPool,
-                    new BytesRefRecycler(new MockPageCacheRecycler(Settings.EMPTY))
+                    new BytesRefRecycler(new MockPageCacheRecycler(Settings.EMPTY)),
+                    new HandlingTimeTracker()
                 )
             );
 

+ 2 - 1
test/framework/src/main/java/org/elasticsearch/transport/TestTransportChannels.java

@@ -9,6 +9,7 @@
 package org.elasticsearch.transport;
 
 import org.elasticsearch.Version;
+import org.elasticsearch.common.network.HandlingTimeTracker;
 import org.elasticsearch.common.util.PageCacheRecycler;
 import org.elasticsearch.threadpool.ThreadPool;
 
@@ -24,7 +25,7 @@ public class TestTransportChannels {
     ) {
         BytesRefRecycler recycler = new BytesRefRecycler(PageCacheRecycler.NON_RECYCLING_INSTANCE);
         return new TcpTransportChannel(
-            new OutboundHandler(nodeName, version, new StatsTracker(), threadPool, recycler),
+            new OutboundHandler(nodeName, version, new StatsTracker(), threadPool, recycler, new HandlingTimeTracker()),
             channel,
             action,
             requestId,