Browse Source

Return only requested master stats (#110970)

Ievgen Degtiarenko 1 year ago
parent
commit
8fe73d6f44

+ 49 - 0
server/src/internalClusterTest/java/org/elasticsearch/rest/action/admin/cluster/RestNodesStatsActionIT.java

@@ -0,0 +1,49 @@
+/*
+ * Copyright Elasticsearch B.V. and/or licensed to Elasticsearch B.V. under one
+ * or more contributor license agreements. Licensed under the Elastic License
+ * 2.0 and the Server Side Public License, v 1; you may not use this file except
+ * in compliance with, at your election, the Elastic License 2.0 or the Server
+ * Side Public License, v 1.
+ */
+
+package org.elasticsearch.rest.action.admin.cluster;
+
+import org.elasticsearch.action.admin.cluster.allocation.TransportGetAllocationStatsAction;
+import org.elasticsearch.action.admin.cluster.node.stats.NodesStatsRequest;
+import org.elasticsearch.action.admin.cluster.node.stats.NodesStatsRequestParameters.Metric;
+import org.elasticsearch.common.util.CollectionUtils;
+import org.elasticsearch.plugins.Plugin;
+import org.elasticsearch.test.ESIntegTestCase;
+import org.elasticsearch.test.transport.MockTransportService;
+
+import java.util.Collection;
+import java.util.Objects;
+import java.util.concurrent.atomic.AtomicInteger;
+
+import static org.hamcrest.Matchers.equalTo;
+
+public class RestNodesStatsActionIT extends ESIntegTestCase {
+
+    @Override
+    protected Collection<Class<? extends Plugin>> nodePlugins() {
+        return CollectionUtils.appendToCopy(super.nodePlugins(), MockTransportService.TestPlugin.class);
+    }
+
+    public void testSendOnlyNecessaryElectedMasterNodeStatsRequest() {
+        var node = internalCluster().startDataOnlyNode();
+
+        var getAllocationStatsActions = new AtomicInteger(0);
+        MockTransportService.getInstance(node).addSendBehavior((connection, requestId, action, request, options) -> {
+            if (Objects.equals(action, TransportGetAllocationStatsAction.TYPE.name())) {
+                getAllocationStatsActions.incrementAndGet();
+            }
+            connection.sendRequest(requestId, action, request, options);
+        });
+
+        var metrics = randomSubsetOf(Metric.values().length, Metric.values());
+        client(node).admin().cluster().nodesStats(new NodesStatsRequest().addMetrics(metrics)).actionGet();
+
+        var shouldSendGetAllocationStatsRequest = metrics.contains(Metric.ALLOCATIONS) || metrics.contains(Metric.FS);
+        assertThat(getAllocationStatsActions.get(), equalTo(shouldSendGetAllocationStatsRequest ? 1 : 0));
+    }
+}

+ 1 - 0
server/src/main/java/org/elasticsearch/TransportVersions.java

@@ -217,6 +217,7 @@ public class TransportVersions {
     public static final TransportVersion ENRICH_CACHE_STATS_SIZE_ADDED = def(8_707_00_0);
     public static final TransportVersion ENTERPRISE_GEOIP_DOWNLOADER = def(8_708_00_0);
     public static final TransportVersion NODES_STATS_ENUM_SET = def(8_709_00_0);
+    public static final TransportVersion MASTER_NODE_METRICS = def(8_710_00_0);
 
     /*
      * STOP! READ THIS FIRST! No, really,

+ 21 - 5
server/src/main/java/org/elasticsearch/action/admin/cluster/allocation/TransportGetAllocationStatsAction.java

@@ -13,6 +13,7 @@ import org.elasticsearch.action.ActionListener;
 import org.elasticsearch.action.ActionRequestValidationException;
 import org.elasticsearch.action.ActionResponse;
 import org.elasticsearch.action.ActionType;
+import org.elasticsearch.action.admin.cluster.node.stats.NodesStatsRequestParameters.Metric;
 import org.elasticsearch.action.support.ActionFilters;
 import org.elasticsearch.action.support.master.MasterNodeReadRequest;
 import org.elasticsearch.action.support.master.TransportMasterNodeReadAction;
@@ -36,6 +37,7 @@ import org.elasticsearch.threadpool.ThreadPool;
 import org.elasticsearch.transport.TransportService;
 
 import java.io.IOException;
+import java.util.EnumSet;
 import java.util.Map;
 
 public class TransportGetAllocationStatsAction extends TransportMasterNodeReadAction<
@@ -88,10 +90,11 @@ public class TransportGetAllocationStatsAction extends TransportMasterNodeReadAc
     protected void masterOperation(Task task, Request request, ClusterState state, ActionListener<Response> listener) throws Exception {
         listener.onResponse(
             new Response(
-                allocationStatsService.stats(),
-                featureService.clusterHasFeature(clusterService.state(), AllocationStatsFeatures.INCLUDE_DISK_THRESHOLD_SETTINGS)
-                    ? diskThresholdSettings
-                    : null
+                request.metrics().contains(Metric.ALLOCATIONS) ? allocationStatsService.stats() : Map.of(),
+                request.metrics().contains(Metric.FS)
+                    && featureService.clusterHasFeature(clusterService.state(), AllocationStatsFeatures.INCLUDE_DISK_THRESHOLD_SETTINGS)
+                        ? diskThresholdSettings
+                        : null
             )
         );
     }
@@ -103,19 +106,32 @@ public class TransportGetAllocationStatsAction extends TransportMasterNodeReadAc
 
     public static class Request extends MasterNodeReadRequest<Request> {
 
-        public Request(TimeValue masterNodeTimeout, TaskId parentTaskId) {
+        private final EnumSet<Metric> metrics;
+
+        public Request(TimeValue masterNodeTimeout, TaskId parentTaskId, EnumSet<Metric> metrics) {
             super(masterNodeTimeout);
             setParentTask(parentTaskId);
+            this.metrics = metrics;
         }
 
         public Request(StreamInput in) throws IOException {
             super(in);
+            this.metrics = in.getTransportVersion().onOrAfter(TransportVersions.MASTER_NODE_METRICS)
+                ? in.readEnumSet(Metric.class)
+                : EnumSet.of(Metric.ALLOCATIONS, Metric.FS);
         }
 
         @Override
         public void writeTo(StreamOutput out) throws IOException {
             assert out.getTransportVersion().onOrAfter(TransportVersions.ALLOCATION_STATS);
             super.writeTo(out);
+            if (out.getTransportVersion().onOrAfter(TransportVersions.MASTER_NODE_METRICS)) {
+                out.writeEnumSet(metrics);
+            }
+        }
+
+        public EnumSet<Metric> metrics() {
+            return metrics;
         }
 
         @Override

+ 1 - 1
server/src/main/java/org/elasticsearch/action/admin/cluster/node/stats/NodesStatsRequestParameters.java

@@ -62,7 +62,7 @@ public class NodesStatsRequestParameters implements Writeable {
         this.indices = indices;
     }
 
-    public Set<Metric> requestedMetrics() {
+    public EnumSet<Metric> requestedMetrics() {
         return requestedMetrics;
     }
 

+ 7 - 6
server/src/main/java/org/elasticsearch/action/admin/cluster/node/stats/TransportNodesStatsAction.java

@@ -57,8 +57,8 @@ public class TransportNodesStatsAction extends TransportNodesAction<
         ThreadPool threadPool,
         ClusterService clusterService,
         TransportService transportService,
-        NodeService nodeService,
         ActionFilters actionFilters,
+        NodeService nodeService,
         NodeClient client
     ) {
         super(
@@ -92,14 +92,15 @@ public class TransportNodesStatsAction extends TransportNodesAction<
                 TransportGetAllocationStatsAction.TYPE,
                 new TransportGetAllocationStatsAction.Request(
                     Objects.requireNonNullElse(request.timeout(), RestUtils.REST_MASTER_TIMEOUT_DEFAULT),
-                    new TaskId(clusterService.localNode().getId(), task.getId())
+                    new TaskId(clusterService.localNode().getId(), task.getId()),
+                    metrics
                 ),
-                listener.delegateFailure((l, r) -> {
-                    ActionListener.respondAndRelease(
+                listener.delegateFailure(
+                    (l, r) -> ActionListener.respondAndRelease(
                         l,
                         newResponse(request, merge(responses, r.getNodeAllocationStats(), r.getDiskThresholdSettings()), failures)
-                    );
-                })
+                    )
+                )
             );
         } else {
             ActionListener.run(listener, l -> ActionListener.respondAndRelease(l, newResponse(request, responses, failures)));

+ 1 - 0
server/src/test/java/org/elasticsearch/action/admin/cluster/allocation/TransportClusterAllocationExplainActionTests.java

@@ -67,6 +67,7 @@ public class TransportClusterAllocationExplainActionTests extends ESTestCase {
         );
     }
 
+    @Override
     @After
     public void tearDown() throws Exception {
         super.tearDown();

+ 122 - 0
server/src/test/java/org/elasticsearch/action/admin/cluster/allocation/TransportGetAllocationStatsActionTests.java

@@ -0,0 +1,122 @@
+/*
+ * Copyright Elasticsearch B.V. and/or licensed to Elasticsearch B.V. under one
+ * or more contributor license agreements. Licensed under the Elastic License
+ * 2.0 and the Server Side Public License, v 1; you may not use this file except
+ * in compliance with, at your election, the Elastic License 2.0 or the Server
+ * Side Public License, v 1.
+ */
+
+package org.elasticsearch.action.admin.cluster.allocation;
+
+import org.elasticsearch.action.admin.cluster.node.stats.NodesStatsRequestParameters.Metric;
+import org.elasticsearch.action.support.ActionFilters;
+import org.elasticsearch.action.support.PlainActionFuture;
+import org.elasticsearch.cluster.ClusterState;
+import org.elasticsearch.cluster.routing.allocation.AllocationStatsService;
+import org.elasticsearch.cluster.routing.allocation.NodeAllocationStatsTests;
+import org.elasticsearch.cluster.service.ClusterService;
+import org.elasticsearch.core.TimeValue;
+import org.elasticsearch.features.FeatureService;
+import org.elasticsearch.tasks.Task;
+import org.elasticsearch.tasks.TaskId;
+import org.elasticsearch.test.ClusterServiceUtils;
+import org.elasticsearch.test.ESTestCase;
+import org.elasticsearch.test.transport.CapturingTransport;
+import org.elasticsearch.threadpool.TestThreadPool;
+import org.elasticsearch.threadpool.ThreadPool;
+import org.elasticsearch.transport.TransportService;
+import org.junit.After;
+import org.junit.Before;
+
+import java.util.EnumSet;
+import java.util.Map;
+import java.util.Set;
+
+import static org.hamcrest.Matchers.anEmptyMap;
+import static org.hamcrest.Matchers.not;
+import static org.mockito.ArgumentMatchers.any;
+import static org.mockito.ArgumentMatchers.eq;
+import static org.mockito.Mockito.mock;
+import static org.mockito.Mockito.never;
+import static org.mockito.Mockito.verify;
+import static org.mockito.Mockito.when;
+
+public class TransportGetAllocationStatsActionTests extends ESTestCase {
+
+    private ThreadPool threadPool;
+    private ClusterService clusterService;
+    private TransportService transportService;
+    private AllocationStatsService allocationStatsService;
+    private FeatureService featureService;
+
+    private TransportGetAllocationStatsAction action;
+
+    @Override
+    @Before
+    public void setUp() throws Exception {
+        super.setUp();
+        threadPool = new TestThreadPool(TransportClusterAllocationExplainActionTests.class.getName());
+        clusterService = ClusterServiceUtils.createClusterService(threadPool);
+        transportService = new CapturingTransport().createTransportService(
+            clusterService.getSettings(),
+            threadPool,
+            TransportService.NOOP_TRANSPORT_INTERCEPTOR,
+            address -> clusterService.localNode(),
+            clusterService.getClusterSettings(),
+            Set.of()
+        );
+        allocationStatsService = mock(AllocationStatsService.class);
+        featureService = mock(FeatureService.class);
+        action = new TransportGetAllocationStatsAction(
+            transportService,
+            clusterService,
+            threadPool,
+            new ActionFilters(Set.of()),
+            null,
+            allocationStatsService,
+            featureService
+        );
+    }
+
+    @Override
+    @After
+    public void tearDown() throws Exception {
+        super.tearDown();
+        threadPool.shutdown();
+        clusterService.close();
+        transportService.close();
+    }
+
+    public void testReturnsOnlyRequestedStats() throws Exception {
+
+        var metrics = EnumSet.copyOf(randomSubsetOf(Metric.values().length, Metric.values()));
+
+        var request = new TransportGetAllocationStatsAction.Request(
+            TimeValue.ONE_MINUTE,
+            new TaskId(randomIdentifier(), randomNonNegativeLong()),
+            metrics
+        );
+
+        when(allocationStatsService.stats()).thenReturn(Map.of(randomIdentifier(), NodeAllocationStatsTests.randomNodeAllocationStats()));
+        when(featureService.clusterHasFeature(any(ClusterState.class), eq(AllocationStatsFeatures.INCLUDE_DISK_THRESHOLD_SETTINGS)))
+            .thenReturn(true);
+
+        var future = new PlainActionFuture<TransportGetAllocationStatsAction.Response>();
+        action.masterOperation(mock(Task.class), request, ClusterState.EMPTY_STATE, future);
+        var response = future.get();
+
+        if (metrics.contains(Metric.ALLOCATIONS)) {
+            assertThat(response.getNodeAllocationStats(), not(anEmptyMap()));
+            verify(allocationStatsService).stats();
+        } else {
+            assertThat(response.getNodeAllocationStats(), anEmptyMap());
+            verify(allocationStatsService, never()).stats();
+        }
+
+        if (metrics.contains(Metric.FS)) {
+            assertNotNull(response.getDiskThresholdSettings());
+        } else {
+            assertNull(response.getDiskThresholdSettings());
+        }
+    }
+}

+ 11 - 7
server/src/test/java/org/elasticsearch/cluster/routing/allocation/NodeAllocationStatsTests.java

@@ -23,13 +23,7 @@ public class NodeAllocationStatsTests extends AbstractWireSerializingTestCase<No
 
     @Override
     protected NodeAllocationStats createTestInstance() {
-        return new NodeAllocationStats(
-            randomIntBetween(0, 10000),
-            randomIntBetween(0, 1000),
-            randomDoubleBetween(0, 8, true),
-            randomNonNegativeLong(),
-            randomNonNegativeLong()
-        );
+        return randomNodeAllocationStats();
     }
 
     @Override
@@ -73,4 +67,14 @@ public class NodeAllocationStatsTests extends AbstractWireSerializingTestCase<No
             default -> throw new RuntimeException("unreachable");
         };
     }
+
+    public static NodeAllocationStats randomNodeAllocationStats() {
+        return new NodeAllocationStats(
+            randomIntBetween(0, 10000),
+            randomIntBetween(0, 1000),
+            randomDoubleBetween(0, 8, true),
+            randomNonNegativeLong(),
+            randomNonNegativeLong()
+        );
+    }
 }

+ 5 - 5
server/src/test/java/org/elasticsearch/rest/action/admin/cluster/RestNodesStatsActionTests.java

@@ -35,7 +35,7 @@ public class RestNodesStatsActionTests extends ESTestCase {
         action = new RestNodesStatsAction();
     }
 
-    public void testUnrecognizedMetric() throws IOException {
+    public void testUnrecognizedMetric() {
         final HashMap<String, String> params = new HashMap<>();
         final String metric = randomAlphaOfLength(64);
         params.put("metric", metric);
@@ -47,7 +47,7 @@ public class RestNodesStatsActionTests extends ESTestCase {
         assertThat(e, hasToString(containsString("request [/_nodes/stats] contains unrecognized metric: [" + metric + "]")));
     }
 
-    public void testUnrecognizedMetricDidYouMean() throws IOException {
+    public void testUnrecognizedMetricDidYouMean() {
         final HashMap<String, String> params = new HashMap<>();
         params.put("metric", "os,transprot,unrecognized");
         final RestRequest request = new FakeRestRequest.Builder(xContentRegistry()).withPath("/_nodes/stats").withParams(params).build();
@@ -65,7 +65,7 @@ public class RestNodesStatsActionTests extends ESTestCase {
         );
     }
 
-    public void testAllRequestWithOtherMetrics() throws IOException {
+    public void testAllRequestWithOtherMetrics() {
         final HashMap<String, String> params = new HashMap<>();
         final String metric = randomFrom(Metric.ALL_NAMES);
         params.put("metric", "_all," + metric);
@@ -109,7 +109,7 @@ public class RestNodesStatsActionTests extends ESTestCase {
         );
     }
 
-    public void testIndexMetricsRequestWithoutIndicesMetric() throws IOException {
+    public void testIndexMetricsRequestWithoutIndicesMetric() {
         final HashMap<String, String> params = new HashMap<>();
         final Set<String> metrics = new HashSet<>(Metric.ALL_NAMES);
         metrics.remove("indices");
@@ -129,7 +129,7 @@ public class RestNodesStatsActionTests extends ESTestCase {
         );
     }
 
-    public void testIndexMetricsRequestOnAllRequest() throws IOException {
+    public void testIndexMetricsRequestOnAllRequest() {
         final HashMap<String, String> params = new HashMap<>();
         params.put("metric", "_all");
         final String indexMetric = randomSubsetOf(1, RestNodesStatsAction.FLAGS.keySet()).get(0);