Browse Source

Add index pressure stats in cluster stats (#80303)

`GET _nodes/stats` returns statistics about indexing pressure for each node.
With this commit `GET _cluster/stats` now returns stats about indexing pressure
computed by aggregating the indexing pressure stats of each node in the
cluster.

Closes #79788
Olivier Cavadenti 3 years ago
parent
commit
90e4e8ce63

+ 33 - 0
docs/reference/cluster/stats.asciidoc

@@ -1477,6 +1477,39 @@ The API returns the following response:
           ...
         }
       },
+      "indexing_pressure": {
+        "memory": {
+            "current": {
+                 "combined_coordinating_and_primary": "0b",
+                 "combined_coordinating_and_primary_in_bytes": 0,
+                 "coordinating": "0b",
+                 "coordinating_in_bytes": 0,
+                 "primary": "0b",
+                 "primary_in_bytes": 0,
+                 "replica": "0b",
+                 "replica_in_bytes": 0,
+                 "all": "0b",
+                 "all_in_bytes": 0
+            },
+            "total": {
+                "combined_coordinating_and_primary": "0b",
+                "combined_coordinating_and_primary_in_bytes": 0,
+                "coordinating": "0b",
+                "coordinating_in_bytes": 0,
+                "primary": "0b",
+                "primary_in_bytes": 0,
+                "replica": "0b",
+                "replica_in_bytes": 0,
+                "all": "0b",
+                "all_in_bytes": 0,
+                "coordinating_rejections": 0,
+                "primary_rejections": 0,
+                "replica_rejections": 0
+            },
+            "limit" : "0b",
+            "limit_in_bytes": 0
+        }
+      },
       "network_types": {
         ...
       },

+ 25 - 0
rest-api-spec/src/yamlRestTest/resources/rest-api-spec/test/cluster.stats/20_indexing_pressure.yml

@@ -0,0 +1,25 @@
+---
+"Indexing pressure cluster stats":
+  - skip:
+      version: " - 8.0.99"
+      reason: "indexing_pressure in cluster was added in 8.1"
+
+  - do:
+      cluster.stats: {}
+
+  - gte:  { nodes.indexing_pressure.memory.current.combined_coordinating_and_primary_in_bytes: 0 }
+  - gte:  { nodes.indexing_pressure.memory.current.coordinating_in_bytes: 0 }
+  - gte:  { nodes.indexing_pressure.memory.current.primary_in_bytes: 0 }
+  - gte:  { nodes.indexing_pressure.memory.current.replica_in_bytes: 0 }
+  - gte:  { nodes.indexing_pressure.memory.current.all_in_bytes: 0 }
+
+  - gte:  { nodes.indexing_pressure.memory.total.combined_coordinating_and_primary_in_bytes: 0 }
+  - gte:  { nodes.indexing_pressure.memory.total.coordinating_in_bytes: 0 }
+  - gte:  { nodes.indexing_pressure.memory.total.primary_in_bytes: 0 }
+  - gte:  { nodes.indexing_pressure.memory.total.replica_in_bytes: 0 }
+  - gte:  { nodes.indexing_pressure.memory.total.all_in_bytes: 0 }
+
+  - gte:  { nodes.indexing_pressure.memory.total.coordinating_rejections: 0 }
+  - gte:  { nodes.indexing_pressure.memory.total.primary_rejections: 0 }
+  - gte:  { nodes.indexing_pressure.memory.total.replica_rejections: 0 }
+  - gte:  { nodes.indexing_pressure.memory.limit_in_bytes: 0 }

+ 82 - 0
server/src/main/java/org/elasticsearch/action/admin/cluster/stats/ClusterStatsNodes.java

@@ -24,6 +24,7 @@ import org.elasticsearch.common.unit.ByteSizeValue;
 import org.elasticsearch.core.TimeValue;
 import org.elasticsearch.core.Tuple;
 import org.elasticsearch.discovery.DiscoveryModule;
+import org.elasticsearch.index.stats.IndexingPressureStats;
 import org.elasticsearch.monitor.fs.FsInfo;
 import org.elasticsearch.monitor.jvm.JvmInfo;
 import org.elasticsearch.monitor.os.OsInfo;
@@ -59,6 +60,7 @@ public class ClusterStatsNodes implements ToXContentFragment {
     private final DiscoveryTypes discoveryTypes;
     private final PackagingTypes packagingTypes;
     private final IngestStats ingestStats;
+    private final IndexPressureStats indexPressureStats;
 
     ClusterStatsNodes(List<ClusterStatsNodeResponse> nodeResponses) {
         this.versions = new HashSet<>();
@@ -92,6 +94,7 @@ public class ClusterStatsNodes implements ToXContentFragment {
         this.discoveryTypes = new DiscoveryTypes(nodeInfos);
         this.packagingTypes = new PackagingTypes(nodeInfos);
         this.ingestStats = new IngestStats(nodeStats);
+        this.indexPressureStats = new IndexPressureStats(nodeStats);
     }
 
     public Counts getCounts() {
@@ -176,6 +179,8 @@ public class ClusterStatsNodes implements ToXContentFragment {
 
         ingestStats.toXContent(builder, params);
 
+        indexPressureStats.toXContent(builder, params);
+
         return builder;
     }
 
@@ -769,4 +774,81 @@ public class ClusterStatsNodes implements ToXContentFragment {
 
     }
 
+    static class IndexPressureStats implements ToXContentFragment {
+
+        private final IndexingPressureStats indexingPressureStats;
+
+        IndexPressureStats(final List<NodeStats> nodeStats) {
+            long totalCombinedCoordinatingAndPrimaryBytes = 0;
+            long totalCoordinatingBytes = 0;
+            long totalPrimaryBytes = 0;
+            long totalReplicaBytes = 0;
+
+            long currentCombinedCoordinatingAndPrimaryBytes = 0;
+            long currentCoordinatingBytes = 0;
+            long currentPrimaryBytes = 0;
+            long currentReplicaBytes = 0;
+            long coordinatingRejections = 0;
+            long primaryRejections = 0;
+            long replicaRejections = 0;
+            long memoryLimit = 0;
+
+            long totalCoordinatingOps = 0;
+            long totalPrimaryOps = 0;
+            long totalReplicaOps = 0;
+            long currentCoordinatingOps = 0;
+            long currentPrimaryOps = 0;
+            long currentReplicaOps = 0;
+            for (NodeStats nodeStat : nodeStats) {
+                IndexingPressureStats nodeStatIndexingPressureStats = nodeStat.getIndexingPressureStats();
+                if (nodeStatIndexingPressureStats != null) {
+                    totalCombinedCoordinatingAndPrimaryBytes += nodeStatIndexingPressureStats.getTotalCombinedCoordinatingAndPrimaryBytes();
+                    totalCoordinatingBytes += nodeStatIndexingPressureStats.getTotalCoordinatingBytes();
+                    totalPrimaryBytes += nodeStatIndexingPressureStats.getTotalPrimaryBytes();
+                    totalReplicaBytes += nodeStatIndexingPressureStats.getTotalReplicaBytes();
+                    currentCombinedCoordinatingAndPrimaryBytes += nodeStatIndexingPressureStats
+                        .getCurrentCombinedCoordinatingAndPrimaryBytes();
+                    currentCoordinatingBytes += nodeStatIndexingPressureStats.getCurrentCoordinatingBytes();
+                    currentPrimaryBytes += nodeStatIndexingPressureStats.getCurrentPrimaryBytes();
+                    currentReplicaBytes += nodeStatIndexingPressureStats.getCurrentReplicaBytes();
+                    coordinatingRejections += nodeStatIndexingPressureStats.getCoordinatingRejections();
+                    primaryRejections += nodeStatIndexingPressureStats.getPrimaryRejections();
+                    replicaRejections += nodeStatIndexingPressureStats.getReplicaRejections();
+                    memoryLimit += nodeStatIndexingPressureStats.getMemoryLimit();
+                    totalCoordinatingOps += nodeStatIndexingPressureStats.getTotalCoordinatingOps();
+                    totalReplicaOps += nodeStatIndexingPressureStats.getTotalReplicaOps();
+                    currentCoordinatingOps += nodeStatIndexingPressureStats.getCurrentCoordinatingOps();
+                    currentPrimaryOps += nodeStatIndexingPressureStats.getCurrentPrimaryOps();
+                    currentReplicaOps += nodeStatIndexingPressureStats.getCurrentReplicaOps();
+                }
+            }
+            indexingPressureStats = new IndexingPressureStats(
+                totalCombinedCoordinatingAndPrimaryBytes,
+                totalCoordinatingBytes,
+                totalPrimaryBytes,
+                totalReplicaBytes,
+                currentCombinedCoordinatingAndPrimaryBytes,
+                currentCoordinatingBytes,
+                currentPrimaryBytes,
+                currentReplicaBytes,
+                coordinatingRejections,
+                primaryRejections,
+                replicaRejections,
+                memoryLimit,
+                totalCoordinatingOps,
+                totalPrimaryOps,
+                totalReplicaOps,
+                currentCoordinatingOps,
+                currentPrimaryOps,
+                currentReplicaOps
+            );
+        }
+
+        @Override
+        public XContentBuilder toXContent(final XContentBuilder builder, final Params params) throws IOException {
+            return indexingPressureStats.toXContent(builder, params);
+        }
+
+    }
+
 }

+ 4 - 0
server/src/main/java/org/elasticsearch/index/stats/IndexingPressureStats.java

@@ -202,6 +202,10 @@ public class IndexingPressureStats implements Writeable, ToXContentFragment {
         return currentReplicaOps;
     }
 
+    public long getMemoryLimit() {
+        return memoryLimit;
+    }
+
     private static final String COMBINED = "combined_coordinating_and_primary";
     private static final String COMBINED_IN_BYTES = "combined_coordinating_and_primary_in_bytes";
     private static final String COORDINATING = "coordinating";

+ 26 - 1
server/src/test/java/org/elasticsearch/action/admin/cluster/node/stats/NodeStatsTests.java

@@ -21,6 +21,7 @@ import org.elasticsearch.common.network.HandlingTimeTracker;
 import org.elasticsearch.core.Tuple;
 import org.elasticsearch.discovery.DiscoveryStats;
 import org.elasticsearch.http.HttpStats;
+import org.elasticsearch.index.stats.IndexingPressureStats;
 import org.elasticsearch.indices.breaker.AllCircuitBreakerStats;
 import org.elasticsearch.indices.breaker.CircuitBreakerStats;
 import org.elasticsearch.ingest.IngestStats;
@@ -859,6 +860,30 @@ public class NodeStatsTests extends ESTestCase {
             adaptiveSelectionStats = new AdaptiveSelectionStats(nodeConnections, nodeStats);
         }
         ScriptCacheStats scriptCacheStats = scriptStats != null ? scriptStats.toScriptCacheStats() : null;
+        IndexingPressureStats indexingPressureStats = null;
+        if (frequently()) {
+            long maxStatValue = Long.MAX_VALUE / 5;
+            indexingPressureStats = new IndexingPressureStats(
+                randomLongBetween(0, maxStatValue),
+                randomLongBetween(0, maxStatValue),
+                randomLongBetween(0, maxStatValue),
+                randomLongBetween(0, maxStatValue),
+                randomLongBetween(0, maxStatValue),
+                randomLongBetween(0, maxStatValue),
+                randomLongBetween(0, maxStatValue),
+                randomLongBetween(0, maxStatValue),
+                randomLongBetween(0, maxStatValue),
+                randomLongBetween(0, maxStatValue),
+                randomLongBetween(0, maxStatValue),
+                randomLongBetween(0, maxStatValue),
+                randomLongBetween(0, maxStatValue),
+                randomLongBetween(0, maxStatValue),
+                randomLongBetween(0, maxStatValue),
+                randomLongBetween(0, maxStatValue),
+                randomLongBetween(0, maxStatValue),
+                randomLongBetween(0, maxStatValue)
+            );
+        }
         // TODO NodeIndicesStats are not tested here, way too complicated to create, also they need to be migrated to Writeable yet
         return new NodeStats(
             node,
@@ -877,7 +902,7 @@ public class NodeStatsTests extends ESTestCase {
             ingestStats,
             adaptiveSelectionStats,
             scriptCacheStats,
-            null
+            indexingPressureStats
         );
     }
 

+ 83 - 0
server/src/test/java/org/elasticsearch/action/admin/cluster/stats/ClusterStatsNodesTests.java

@@ -14,6 +14,7 @@ import org.elasticsearch.action.admin.cluster.node.stats.NodeStatsTests;
 import org.elasticsearch.cluster.node.DiscoveryNode;
 import org.elasticsearch.common.network.NetworkModule;
 import org.elasticsearch.common.settings.Settings;
+import org.elasticsearch.index.stats.IndexingPressureStats;
 import org.elasticsearch.test.ESTestCase;
 import org.elasticsearch.xcontent.XContentType;
 
@@ -120,6 +121,88 @@ public class ClusterStatsNodesTests extends ESTestCase {
         );
     }
 
+    public void testIndexPressureStats() throws Exception {
+        List<NodeStats> nodeStats = Arrays.asList(
+            randomValueOtherThanMany(n -> n.getIndexingPressureStats() == null, NodeStatsTests::createNodeStats),
+            randomValueOtherThanMany(n -> n.getIndexingPressureStats() == null, NodeStatsTests::createNodeStats)
+        );
+        long[] expectedStats = new long[12];
+        for (NodeStats nodeStat : nodeStats) {
+            IndexingPressureStats indexingPressureStats = nodeStat.getIndexingPressureStats();
+            if (indexingPressureStats != null) {
+                expectedStats[0] += indexingPressureStats.getCurrentCombinedCoordinatingAndPrimaryBytes();
+                expectedStats[1] += indexingPressureStats.getCurrentCoordinatingBytes();
+                expectedStats[2] += indexingPressureStats.getCurrentPrimaryBytes();
+                expectedStats[3] += indexingPressureStats.getCurrentReplicaBytes();
+
+                expectedStats[4] += indexingPressureStats.getTotalCombinedCoordinatingAndPrimaryBytes();
+                expectedStats[5] += indexingPressureStats.getTotalCoordinatingBytes();
+                expectedStats[6] += indexingPressureStats.getTotalPrimaryBytes();
+                expectedStats[7] += indexingPressureStats.getTotalReplicaBytes();
+
+                expectedStats[8] += indexingPressureStats.getCoordinatingRejections();
+                expectedStats[9] += indexingPressureStats.getPrimaryRejections();
+                expectedStats[10] += indexingPressureStats.getReplicaRejections();
+
+                expectedStats[11] += indexingPressureStats.getMemoryLimit();
+            }
+        }
+
+        ClusterStatsNodes.IndexPressureStats indexPressureStats = new ClusterStatsNodes.IndexPressureStats(nodeStats);
+        assertThat(
+            toXContent(indexPressureStats, XContentType.JSON, false).utf8ToString(),
+            equalTo(
+                "{\"indexing_pressure\":{"
+                    + "\"memory\":{"
+                    + "\"current\":{"
+                    + "\"combined_coordinating_and_primary_in_bytes\":"
+                    + expectedStats[0]
+                    + ","
+                    + "\"coordinating_in_bytes\":"
+                    + expectedStats[1]
+                    + ","
+                    + "\"primary_in_bytes\":"
+                    + expectedStats[2]
+                    + ","
+                    + "\"replica_in_bytes\":"
+                    + expectedStats[3]
+                    + ","
+                    + "\"all_in_bytes\":"
+                    + (expectedStats[3] + expectedStats[0])
+                    + "},"
+                    + "\"total\":{"
+                    + "\"combined_coordinating_and_primary_in_bytes\":"
+                    + expectedStats[4]
+                    + ","
+                    + "\"coordinating_in_bytes\":"
+                    + expectedStats[5]
+                    + ","
+                    + "\"primary_in_bytes\":"
+                    + expectedStats[6]
+                    + ","
+                    + "\"replica_in_bytes\":"
+                    + expectedStats[7]
+                    + ","
+                    + "\"all_in_bytes\":"
+                    + (expectedStats[7] + expectedStats[4])
+                    + ","
+                    + "\"coordinating_rejections\":"
+                    + expectedStats[8]
+                    + ","
+                    + "\"primary_rejections\":"
+                    + expectedStats[9]
+                    + ","
+                    + "\"replica_rejections\":"
+                    + expectedStats[10]
+                    + "},"
+                    + "\"limit_in_bytes\":"
+                    + expectedStats[11]
+                    + "}"
+                    + "}}"
+            )
+        );
+    }
+
     private static NodeInfo createNodeInfo(String nodeId, String transportType, String httpType) {
         Settings.Builder settings = Settings.builder();
         if (transportType != null) {

+ 23 - 1
x-pack/plugin/monitoring/src/test/java/org/elasticsearch/xpack/monitoring/collector/cluster/ClusterStatsMonitoringDocTests.java

@@ -669,7 +669,29 @@ public class ClusterStatsMonitoringDocTests extends BaseMonitoringDocTestCase<Cl
                 + "      \"ingest\": {"
                 + "        \"number_of_pipelines\": 0,"
                 + "        \"processor_stats\": {}"
-                + "      }"
+                + "      },"
+                + "      \"indexing_pressure\": {"
+                + "         \"memory\": {"
+                + "             \"current\" :{"
+                + "                 \"combined_coordinating_and_primary_in_bytes\": 0,"
+                + "                 \"coordinating_in_bytes\": 0,"
+                + "                 \"primary_in_bytes\": 0,"
+                + "                 \"replica_in_bytes\": 0,"
+                + "                 \"all_in_bytes\": 0"
+                + "             },"
+                + "             \"total\": {"
+                + "                 \"combined_coordinating_and_primary_in_bytes\": 0,"
+                + "                 \"coordinating_in_bytes\": 0,"
+                + "                 \"primary_in_bytes\": 0,"
+                + "                 \"replica_in_bytes\": 0,"
+                + "                 \"all_in_bytes\": 0,"
+                + "                 \"coordinating_rejections\": 0,"
+                + "                 \"primary_rejections\": 0,"
+                + "                 \"replica_rejections\": 0"
+                + "             },"
+                + "             \"limit_in_bytes\": 0"
+                + "         }"
+                + "       }"
                 + "    }"
                 + "  },"
                 + "  \"cluster_state\": {"