Browse Source

Expose HTTP connection metrics to telemetry (#130939)

Closes ES-12223
David Turner 3 tháng trước cách đây
mục cha
commit
c02bb4dd89

+ 5 - 0
docs/changelog/130939.yaml

@@ -0,0 +1,5 @@
+pr: 130939
+summary: Expose HTTP connection metrics to telemetry
+area: Network
+type: enhancement
+issues: []

+ 89 - 1
modules/transport-netty4/src/internalClusterTest/java/org/elasticsearch/http/netty4/Netty4PipeliningIT.java

@@ -51,6 +51,8 @@ import org.elasticsearch.features.NodeFeature;
 import org.elasticsearch.http.HttpServerTransport;
 import org.elasticsearch.plugins.ActionPlugin;
 import org.elasticsearch.plugins.Plugin;
+import org.elasticsearch.plugins.PluginsService;
+import org.elasticsearch.plugins.TelemetryPlugin;
 import org.elasticsearch.rest.BaseRestHandler;
 import org.elasticsearch.rest.ChunkedRestResponseBodyPart;
 import org.elasticsearch.rest.RestController;
@@ -60,6 +62,8 @@ import org.elasticsearch.rest.RestResponse;
 import org.elasticsearch.rest.RestStatus;
 import org.elasticsearch.rest.action.EmptyResponseListener;
 import org.elasticsearch.rest.action.RestToXContentListener;
+import org.elasticsearch.telemetry.Measurement;
+import org.elasticsearch.telemetry.TestTelemetryPlugin;
 import org.elasticsearch.test.ESIntegTestCase;
 import org.elasticsearch.transport.netty4.NettyAllocator;
 import org.elasticsearch.xcontent.ToXContentObject;
@@ -91,7 +95,7 @@ public class Netty4PipeliningIT extends ESNetty4IntegTestCase {
     @Override
     protected Collection<Class<? extends Plugin>> nodePlugins() {
         return CollectionUtils.concatLists(
-            List.of(CountDown3Plugin.class, ChunkAndFailPlugin.class, KeepPipeliningPlugin.class),
+            List.of(CountDown3Plugin.class, ChunkAndFailPlugin.class, KeepPipeliningPlugin.class, TestTelemetryPlugin.class),
             super.nodePlugins()
         );
     }
@@ -281,6 +285,90 @@ public class Netty4PipeliningIT extends ESNetty4IntegTestCase {
         }
     }
 
+    public void testConnectionStatsExposedToTelemetryPlugin() throws Exception {
+        final var targetNode = internalCluster().startNode();
+
+        final var telemetryPlugin = asInstanceOf(
+            TestTelemetryPlugin.class,
+            internalCluster().getInstance(PluginsService.class, targetNode).filterPlugins(TelemetryPlugin.class).findAny().orElseThrow()
+        );
+
+        assertHttpMetrics(telemetryPlugin, 0L, 0L);
+
+        final var releasables = new ArrayList<Releasable>(3);
+        try {
+            final var keepPipeliningRequest = new DefaultFullHttpRequest(HttpVersion.HTTP_1_1, HttpMethod.GET, KeepPipeliningPlugin.ROUTE);
+            releasables.add(keepPipeliningRequest::release);
+
+            final var responseReceivedLatch = new CountDownLatch(1);
+
+            final EventLoopGroup eventLoopGroup = new NioEventLoopGroup(1);
+            releasables.add(() -> eventLoopGroup.shutdownGracefully(0, 0, TimeUnit.SECONDS).awaitUninterruptibly());
+            final var clientBootstrap = new Bootstrap().channel(NettyAllocator.getChannelType())
+                .option(ChannelOption.ALLOCATOR, NettyAllocator.getAllocator())
+                .group(eventLoopGroup)
+                .handler(new ChannelInitializer<SocketChannel>() {
+                    @Override
+                    protected void initChannel(SocketChannel ch) {
+                        ch.pipeline().addLast(new HttpClientCodec());
+                        ch.pipeline().addLast(new SimpleChannelInboundHandler<HttpResponse>() {
+
+                            @Override
+                            protected void channelRead0(ChannelHandlerContext ctx, HttpResponse msg) {
+                                responseReceivedLatch.countDown();
+                            }
+
+                            @Override
+                            public void exceptionCaught(ChannelHandlerContext ctx, Throwable cause) {
+                                ExceptionsHelper.maybeDieOnAnotherThread(new AssertionError(cause));
+                            }
+                        });
+                    }
+                });
+
+            final var httpServerTransport = internalCluster().getInstance(HttpServerTransport.class, targetNode);
+            final var httpServerAddress = randomFrom(httpServerTransport.boundAddress().boundAddresses()).address();
+
+            // Open a channel on which we will pipeline the requests to KeepPipeliningPlugin.ROUTE
+            final var pipeliningChannel = clientBootstrap.connect(httpServerAddress).syncUninterruptibly().channel();
+            releasables.add(() -> pipeliningChannel.close().syncUninterruptibly());
+
+            if (randomBoolean()) {
+                // assertBusy because client-side connect may complete before server-side
+                assertBusy(() -> assertHttpMetrics(telemetryPlugin, 1L, 1L));
+            } else {
+                // Send two pipelined requests so that we start to receive responses
+                pipeliningChannel.writeAndFlush(keepPipeliningRequest.retain());
+                pipeliningChannel.writeAndFlush(keepPipeliningRequest.retain());
+
+                // wait until we've started to receive responses (but we won't have received them all) - server side is definitely open now
+                safeAwait(responseReceivedLatch);
+                assertHttpMetrics(telemetryPlugin, 1L, 1L);
+            }
+        } finally {
+            Collections.reverse(releasables);
+            Releasables.close(releasables);
+        }
+
+        // assertBusy because client-side close may complete before server-side
+        assertBusy(() -> assertHttpMetrics(telemetryPlugin, 1L, 0L));
+    }
+
+    private static void assertHttpMetrics(TestTelemetryPlugin telemetryPlugin, long expectedTotal, long expectedCurrent) {
+        try {
+            telemetryPlugin.collect();
+            assertMeasurement(telemetryPlugin.getLongAsyncCounterMeasurement("es.http.connections.total"), expectedTotal);
+            assertMeasurement(telemetryPlugin.getLongGaugeMeasurement("es.http.connections.current"), expectedCurrent);
+        } finally {
+            telemetryPlugin.resetMeter();
+        }
+    }
+
+    private static void assertMeasurement(List<Measurement> measurements, long expectedValue) {
+        assertThat(measurements, hasSize(1));
+        assertThat(measurements.get(0).getLong(), equalTo(expectedValue));
+    }
+
     private void assertOpaqueIdsInOrder(Collection<String> opaqueIds) {
         // check if opaque ids are monotonically increasing
         int i = 0;

+ 31 - 0
server/src/main/java/org/elasticsearch/http/AbstractHttpServerTransport.java

@@ -41,6 +41,8 @@ import org.elasticsearch.rest.RestChannel;
 import org.elasticsearch.rest.RestRequest;
 import org.elasticsearch.tasks.Task;
 import org.elasticsearch.telemetry.TelemetryProvider;
+import org.elasticsearch.telemetry.metric.LongWithAttributes;
+import org.elasticsearch.telemetry.metric.MeterRegistry;
 import org.elasticsearch.telemetry.tracing.Tracer;
 import org.elasticsearch.threadpool.ThreadPool;
 import org.elasticsearch.transport.BindTransportException;
@@ -103,6 +105,8 @@ public abstract class AbstractHttpServerTransport extends AbstractLifecycleCompo
 
     private final HttpTracer httpLogger;
     private final Tracer tracer;
+    private final MeterRegistry meterRegistry;
+    private final List<AutoCloseable> metricsToClose = new ArrayList<>(2);
     private volatile boolean shuttingDown;
     private final ReadWriteLock shuttingDownRWLock = new StampedLock().asReadWriteLock();
 
@@ -142,6 +146,7 @@ public abstract class AbstractHttpServerTransport extends AbstractLifecycleCompo
 
         this.maxContentLength = SETTING_HTTP_MAX_CONTENT_LENGTH.get(settings);
         this.tracer = telemetryProvider.getTracer();
+        this.meterRegistry = telemetryProvider.getMeterRegistry();
         this.httpLogger = new HttpTracer(settings, clusterSettings);
         clusterSettings.addSettingsUpdateConsumer(
             TransportSettings.SLOW_OPERATION_THRESHOLD_SETTING,
@@ -238,6 +243,22 @@ public abstract class AbstractHttpServerTransport extends AbstractLifecycleCompo
 
     @Override
     protected final void doStart() {
+        metricsToClose.add(
+            meterRegistry.registerLongAsyncCounter(
+                "es.http.connections.total",
+                "total number of inbound HTTP connections accepted",
+                "count",
+                () -> new LongWithAttributes(totalChannelsAccepted.get())
+            )
+        );
+        metricsToClose.add(
+            meterRegistry.registerLongGauge(
+                "es.http.connections.current",
+                "number of inbound HTTP connections currently open",
+                "count",
+                () -> new LongWithAttributes(httpChannels.size())
+            )
+        );
         startInternal();
     }
 
@@ -328,6 +349,16 @@ public abstract class AbstractHttpServerTransport extends AbstractLifecycleCompo
                 logger.warn("unexpected exception while waiting for http channels to close", e);
             }
         }
+
+        for (final var metricToClose : metricsToClose) {
+            try {
+                metricToClose.close();
+            } catch (Exception e) {
+                logger.warn("unexpected exception while closing metric [{}]", metricToClose);
+                assert false : e;
+            }
+        }
+
         stopInternal();
     }