Explorar o código

Adding indexing pressure stats to node stats API (#59247)

We have recently added internal metrics to monitor the amount of
indexing occurring on a node. These metrics introduce back pressure to
indexing when memory utilization is too high. This commit exposes these
stats through the node stats API.
Tim Brooks %!s(int64=5) %!d(string=hai) anos
pai
achega
b87bb86d88
Modificáronse 37 ficheiros con 541 adicións e 231 borrados
  1. 129 0
      qa/smoke-test-http/src/test/java/org/elasticsearch/http/IndexingPressureRestIT.java
  2. 8 4
      rest-api-spec/src/main/resources/rest-api-spec/api/nodes.stats.json
  3. 28 0
      rest-api-spec/src/main/resources/rest-api-spec/test/nodes.stats/50_indexing_pressure.yml
  4. 61 58
      server/src/internalClusterTest/java/org/elasticsearch/index/IndexingPressureIT.java
  5. 26 1
      server/src/main/java/org/elasticsearch/action/admin/cluster/node/stats/NodeStats.java
  6. 2 1
      server/src/main/java/org/elasticsearch/action/admin/cluster/node/stats/NodesStatsRequest.java
  7. 5 0
      server/src/main/java/org/elasticsearch/action/admin/cluster/node/stats/NodesStatsRequestBuilder.java
  8. 2 1
      server/src/main/java/org/elasticsearch/action/admin/cluster/node/stats/TransportNodesStatsAction.java
  9. 1 1
      server/src/main/java/org/elasticsearch/action/admin/cluster/stats/TransportClusterStatsAction.java
  10. 7 6
      server/src/main/java/org/elasticsearch/action/bulk/TransportBulkAction.java
  11. 3 2
      server/src/main/java/org/elasticsearch/action/bulk/TransportShardBulkAction.java
  12. 0 92
      server/src/main/java/org/elasticsearch/action/bulk/WriteMemoryLimits.java
  13. 3 3
      server/src/main/java/org/elasticsearch/action/resync/TransportResyncReplicationAction.java
  14. 7 7
      server/src/main/java/org/elasticsearch/action/support/replication/TransportWriteAction.java
  15. 2 2
      server/src/main/java/org/elasticsearch/common/settings/ClusterSettings.java
  16. 111 0
      server/src/main/java/org/elasticsearch/index/IndexingPressure.java
  17. 3 3
      server/src/main/java/org/elasticsearch/index/seqno/RetentionLeaseSyncAction.java
  18. 85 0
      server/src/main/java/org/elasticsearch/index/stats/IndexingPressureStats.java
  19. 4 4
      server/src/main/java/org/elasticsearch/node/Node.java
  20. 8 4
      server/src/main/java/org/elasticsearch/node/NodeService.java
  21. 1 1
      server/src/test/java/org/elasticsearch/action/admin/cluster/node/stats/NodeStatsTests.java
  22. 2 1
      server/src/test/java/org/elasticsearch/action/bulk/TransportBulkActionIndicesThatCannotBeCreatedTests.java
  23. 2 1
      server/src/test/java/org/elasticsearch/action/bulk/TransportBulkActionIngestTests.java
  24. 2 1
      server/src/test/java/org/elasticsearch/action/bulk/TransportBulkActionTests.java
  25. 2 1
      server/src/test/java/org/elasticsearch/action/bulk/TransportBulkActionTookTests.java
  26. 2 2
      server/src/test/java/org/elasticsearch/action/resync/TransportResyncReplicationActionTests.java
  27. 3 3
      server/src/test/java/org/elasticsearch/action/support/replication/TransportWriteActionTests.java
  28. 6 6
      server/src/test/java/org/elasticsearch/cluster/DiskUsageTests.java
  29. 4 4
      server/src/test/java/org/elasticsearch/index/seqno/RetentionLeaseSyncActionTests.java
  30. 4 4
      server/src/test/java/org/elasticsearch/snapshots/SnapshotResiliencyTests.java
  31. 1 1
      test/framework/src/main/java/org/elasticsearch/cluster/MockInternalClusterInfoService.java
  32. 5 5
      test/framework/src/main/java/org/elasticsearch/test/InternalTestCluster.java
  33. 3 3
      x-pack/plugin/ccr/src/internalClusterTest/java/org/elasticsearch/xpack/ccr/LocalIndexFollowingIT.java
  34. 6 6
      x-pack/plugin/ccr/src/main/java/org/elasticsearch/xpack/ccr/action/bulk/TransportBulkShardOperationsAction.java
  35. 1 1
      x-pack/plugin/ml/src/test/java/org/elasticsearch/xpack/ml/MachineLearningInfoTransportActionTests.java
  36. 1 1
      x-pack/plugin/ml/src/test/java/org/elasticsearch/xpack/ml/action/TransportGetTrainedModelsStatsActionTests.java
  37. 1 1
      x-pack/plugin/monitoring/src/test/java/org/elasticsearch/xpack/monitoring/collector/node/NodeStatsMonitoringDocTests.java

+ 129 - 0
qa/smoke-test-http/src/test/java/org/elasticsearch/http/IndexingPressureRestIT.java

@@ -0,0 +1,129 @@
+/*
+ * Licensed to Elasticsearch under one or more contributor
+ * license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright
+ * ownership. Elasticsearch licenses this file to you under
+ * the Apache License, Version 2.0 (the "License"); you may
+ * not use this file except in compliance with the License.
+ * You may obtain a copy of the License at
+ *
+ *    http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied.  See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+package org.elasticsearch.http;
+
+import org.elasticsearch.client.Request;
+import org.elasticsearch.client.Response;
+import org.elasticsearch.client.ResponseException;
+import org.elasticsearch.common.settings.Settings;
+import org.elasticsearch.common.xcontent.XContentHelper;
+import org.elasticsearch.common.xcontent.json.JsonXContent;
+import org.elasticsearch.index.IndexingPressure;
+import org.elasticsearch.test.ESIntegTestCase.ClusterScope;
+import org.elasticsearch.test.ESIntegTestCase.Scope;
+import org.elasticsearch.test.XContentTestUtils;
+
+import java.io.IOException;
+import java.util.ArrayList;
+import java.util.Map;
+
+import static org.elasticsearch.rest.RestStatus.CREATED;
+import static org.elasticsearch.rest.RestStatus.OK;
+import static org.elasticsearch.rest.RestStatus.TOO_MANY_REQUESTS;
+import static org.hamcrest.Matchers.equalTo;
+import static org.hamcrest.Matchers.greaterThan;
+import static org.hamcrest.Matchers.lessThan;
+
+/**
+ * Test Indexing Pressure Metrics and Statistics
+ */
+@ClusterScope(scope = Scope.SUITE, supportsDedicatedMasters = false, numDataNodes = 2, numClientNodes = 0)
+public class IndexingPressureRestIT extends HttpSmokeTestCase {
+
+    private static final Settings unboundedWriteQueue = Settings.builder().put("thread_pool.write.queue_size", -1).build();
+
+    @Override
+    protected Settings nodeSettings(int nodeOrdinal) {
+        return Settings.builder()
+            .put(super.nodeSettings(nodeOrdinal))
+            .put(IndexingPressure.MAX_INDEXING_BYTES.getKey(), "1KB")
+            .put(unboundedWriteQueue)
+            .build();
+    }
+
+    @SuppressWarnings("unchecked")
+    public void testIndexingPressureStats() throws IOException {
+        Request createRequest = new Request("PUT", "/index_name");
+        createRequest.setJsonEntity("{\"settings\": {\"index\": {\"number_of_shards\": 1, \"number_of_replicas\": 1, " +
+            "\"write.wait_for_active_shards\": 2}}}");
+        final Response indexCreatedResponse = getRestClient().performRequest(createRequest);
+        assertThat(indexCreatedResponse.getStatusLine().getStatusCode(), equalTo(OK.getStatus()));
+
+        Request successfulIndexingRequest = new Request("POST", "/index_name/_doc/");
+        successfulIndexingRequest.setJsonEntity("{\"x\": \"small text\"}");
+        final Response indexSuccessFul = getRestClient().performRequest(successfulIndexingRequest);
+        assertThat(indexSuccessFul.getStatusLine().getStatusCode(), equalTo(CREATED.getStatus()));
+
+        Request getNodeStats = new Request("GET", "/_nodes/stats/indexing_pressure");
+        final Response nodeStats = getRestClient().performRequest(getNodeStats);
+        Map<String, Object> nodeStatsMap = XContentHelper.convertToMap(JsonXContent.jsonXContent, nodeStats.getEntity().getContent(), true);
+        ArrayList<Object> values = new ArrayList<>(((Map<Object, Object>) nodeStatsMap.get("nodes")).values());
+        assertThat(values.size(), equalTo(2));
+        XContentTestUtils.JsonMapView node1 = new XContentTestUtils.JsonMapView((Map<String, Object>) values.get(0));
+        Integer node1IndexingBytes = node1.get("indexing_pressure.total.coordinating_and_primary_bytes");
+        Integer node1ReplicaBytes = node1.get("indexing_pressure.total.replica_bytes");
+        Integer node1Rejections = node1.get("indexing_pressure.total.coordinating_and_primary_memory_limit_rejections");
+        XContentTestUtils.JsonMapView node2 = new XContentTestUtils.JsonMapView((Map<String, Object>) values.get(1));
+        Integer node2IndexingBytes = node2.get("indexing_pressure.total.coordinating_and_primary_bytes");
+        Integer node2ReplicaBytes = node2.get("indexing_pressure.total.replica_bytes");
+        Integer node2Rejections = node2.get("indexing_pressure.total.coordinating_and_primary_memory_limit_rejections");
+
+        if (node1IndexingBytes == 0) {
+            assertThat(node2IndexingBytes, greaterThan(0));
+            assertThat(node2IndexingBytes, lessThan(1024));
+        } else {
+            assertThat(node1IndexingBytes, greaterThan(0));
+            assertThat(node1IndexingBytes, lessThan(1024));
+        }
+
+        if (node1ReplicaBytes == 0) {
+            assertThat(node2ReplicaBytes, greaterThan(0));
+            assertThat(node2ReplicaBytes, lessThan(1024));
+        } else {
+            assertThat(node2ReplicaBytes, equalTo(0));
+            assertThat(node1ReplicaBytes, lessThan(1024));
+        }
+
+        assertThat(node1Rejections, equalTo(0));
+        assertThat(node2Rejections, equalTo(0));
+
+        Request failedIndexingRequest = new Request("POST", "/index_name/_doc/");
+        String largeString = randomAlphaOfLength(10000);
+        failedIndexingRequest.setJsonEntity("{\"x\": " + largeString + "}");
+        ResponseException exception = expectThrows(ResponseException.class, () -> getRestClient().performRequest(failedIndexingRequest));
+        assertThat(exception.getResponse().getStatusLine().getStatusCode(), equalTo(TOO_MANY_REQUESTS.getStatus()));
+
+        Request getNodeStats2 = new Request("GET", "/_nodes/stats/indexing_pressure");
+        final Response nodeStats2 = getRestClient().performRequest(getNodeStats2);
+        Map<String, Object> nodeStatsMap2 = XContentHelper.convertToMap(JsonXContent.jsonXContent, nodeStats2.getEntity().getContent(),
+            true);
+        ArrayList<Object> values2 = new ArrayList<>(((Map<Object, Object>) nodeStatsMap2.get("nodes")).values());
+        assertThat(values2.size(), equalTo(2));
+        XContentTestUtils.JsonMapView node1AfterRejection = new XContentTestUtils.JsonMapView((Map<String, Object>) values2.get(0));
+        node1Rejections = node1AfterRejection.get("indexing_pressure.total.coordinating_and_primary_memory_limit_rejections");
+        XContentTestUtils.JsonMapView node2AfterRejection = new XContentTestUtils.JsonMapView((Map<String, Object>) values2.get(1));
+        node2Rejections = node2AfterRejection.get("indexing_pressure.total.coordinating_and_primary_memory_limit_rejections");
+
+        if (node1Rejections == 0) {
+            assertThat(node2Rejections, equalTo(1));
+        } else {
+            assertThat(node1Rejections, equalTo(1));
+        }
+    }
+}

+ 8 - 4
rest-api-spec/src/main/resources/rest-api-spec/api/nodes.stats.json

@@ -44,7 +44,8 @@
                 "process",
                 "thread_pool",
                 "transport",
-                "discovery"
+                "discovery",
+                "indexing_pressure"
               ],
               "description":"Limit the information returned to the specified metrics"
             }
@@ -69,7 +70,8 @@
                 "process",
                 "thread_pool",
                 "transport",
-                "discovery"
+                "discovery",
+                "indexing_pressure"
               ],
               "description":"Limit the information returned to the specified metrics"
             },
@@ -98,7 +100,8 @@
                 "process",
                 "thread_pool",
                 "transport",
-                "discovery"
+                "discovery",
+                "indexing_pressure"
               ],
               "description":"Limit the information returned to the specified metrics"
             },
@@ -146,7 +149,8 @@
                 "process",
                 "thread_pool",
                 "transport",
-                "discovery"
+                "discovery",
+                "indexing_pressure"
               ],
               "description":"Limit the information returned to the specified metrics"
             },

+ 28 - 0
rest-api-spec/src/main/resources/rest-api-spec/test/nodes.stats/50_indexing_pressure.yml

@@ -0,0 +1,28 @@
+---
+"Indexing pressure stats":
+  - skip:
+      version: " - 7.99.99"
+      reason: "indexing_pressure not in prior versions"
+      features: [arbitrary_key]
+
+  - do:
+      nodes.info: {}
+  - set:
+      nodes._arbitrary_key_: node_id
+
+  - do:
+      nodes.stats:
+        metric: [ indexing_pressure ]
+
+  - gte:  { nodes.$node_id.indexing_pressure.total.coordinating_and_primary_bytes: 0 }
+  - gte:  { nodes.$node_id.indexing_pressure.total.replica_bytes: 0 }
+  - gte:  { nodes.$node_id.indexing_pressure.total.all_bytes: 0 }
+  - gte:  { nodes.$node_id.indexing_pressure.total.coordinating_and_primary_memory_limit_rejections: 0 }
+  - gte:  { nodes.$node_id.indexing_pressure.total.replica_memory_limit_rejections: 0 }
+  - gte:  { nodes.$node_id.indexing_pressure.current.coordinating_and_primary_bytes: 0 }
+  - gte:  { nodes.$node_id.indexing_pressure.current.replica_bytes: 0 }
+  - gte:  { nodes.$node_id.indexing_pressure.current.all_bytes: 0 }
+
+# TODO:
+#
+# Change skipped version after backport

+ 61 - 58
server/src/internalClusterTest/java/org/elasticsearch/action/bulk/WriteMemoryLimitsIT.java → server/src/internalClusterTest/java/org/elasticsearch/index/IndexingPressureIT.java

@@ -16,11 +16,14 @@
  * specific language governing permissions and limitations
  * under the License.
  */
-package org.elasticsearch.action.bulk;
+package org.elasticsearch.index;
 
 import org.elasticsearch.action.ActionFuture;
 import org.elasticsearch.action.admin.indices.stats.IndicesStatsResponse;
 import org.elasticsearch.action.admin.indices.stats.ShardStats;
+import org.elasticsearch.action.bulk.BulkRequest;
+import org.elasticsearch.action.bulk.BulkResponse;
+import org.elasticsearch.action.bulk.TransportShardBulkAction;
 import org.elasticsearch.action.index.IndexRequest;
 import org.elasticsearch.action.index.IndexResponse;
 import org.elasticsearch.cluster.metadata.IndexMetadata;
@@ -51,7 +54,7 @@ import static org.hamcrest.Matchers.greaterThan;
 import static org.hamcrest.Matchers.instanceOf;
 
 @ESIntegTestCase.ClusterScope(scope = ESIntegTestCase.Scope.TEST, numDataNodes = 2, numClientNodes = 1)
-public class WriteMemoryLimitsIT extends ESIntegTestCase {
+public class IndexingPressureIT extends ESIntegTestCase {
 
     // TODO: Add additional REST tests when metrics are exposed
 
@@ -63,7 +66,6 @@ public class WriteMemoryLimitsIT extends ESIntegTestCase {
     protected Settings nodeSettings(int nodeOrdinal) {
         return Settings.builder()
             .put(super.nodeSettings(nodeOrdinal))
-            // Need at least two threads because we are going to block one
             .put(unboundedWriteQueue)
             .build();
     }
@@ -134,16 +136,16 @@ public class WriteMemoryLimitsIT extends ESIntegTestCase {
             final ActionFuture<BulkResponse> successFuture = client(coordinatingOnlyNode).bulk(bulkRequest);
             replicationSendPointReached.await();
 
-            WriteMemoryLimits primaryWriteLimits = internalCluster().getInstance(WriteMemoryLimits.class, primaryName);
-            WriteMemoryLimits replicaWriteLimits = internalCluster().getInstance(WriteMemoryLimits.class, replicaName);
-            WriteMemoryLimits coordinatingWriteLimits = internalCluster().getInstance(WriteMemoryLimits.class, coordinatingOnlyNode);
+            IndexingPressure primaryWriteLimits = internalCluster().getInstance(IndexingPressure.class, primaryName);
+            IndexingPressure replicaWriteLimits = internalCluster().getInstance(IndexingPressure.class, replicaName);
+            IndexingPressure coordinatingWriteLimits = internalCluster().getInstance(IndexingPressure.class, coordinatingOnlyNode);
 
-            assertThat(primaryWriteLimits.getWriteBytes(), greaterThan(bulkShardRequestSize));
-            assertEquals(0, primaryWriteLimits.getReplicaWriteBytes());
-            assertEquals(0, replicaWriteLimits.getWriteBytes());
-            assertEquals(0, replicaWriteLimits.getReplicaWriteBytes());
-            assertEquals(bulkRequestSize, coordinatingWriteLimits.getWriteBytes());
-            assertEquals(0, coordinatingWriteLimits.getReplicaWriteBytes());
+            assertThat(primaryWriteLimits.getCurrentCoordinatingAndPrimaryBytes(), greaterThan(bulkShardRequestSize));
+            assertEquals(0, primaryWriteLimits.getCurrentReplicaBytes());
+            assertEquals(0, replicaWriteLimits.getCurrentCoordinatingAndPrimaryBytes());
+            assertEquals(0, replicaWriteLimits.getCurrentReplicaBytes());
+            assertEquals(bulkRequestSize, coordinatingWriteLimits.getCurrentCoordinatingAndPrimaryBytes());
+            assertEquals(0, coordinatingWriteLimits.getCurrentReplicaBytes());
 
             latchBlockingReplicationSend.countDown();
 
@@ -165,14 +167,15 @@ public class WriteMemoryLimitsIT extends ESIntegTestCase {
             final long secondBulkShardRequestSize = request.ramBytesUsed();
 
             if (usePrimaryAsCoordinatingNode) {
-                assertThat(primaryWriteLimits.getWriteBytes(), greaterThan(bulkShardRequestSize + secondBulkRequestSize));
-                assertEquals(0, replicaWriteLimits.getWriteBytes());
+                assertThat(primaryWriteLimits.getCurrentCoordinatingAndPrimaryBytes(),
+                    greaterThan(bulkShardRequestSize + secondBulkRequestSize));
+                assertEquals(0, replicaWriteLimits.getCurrentCoordinatingAndPrimaryBytes());
             } else {
-                assertThat(primaryWriteLimits.getWriteBytes(), greaterThan(bulkShardRequestSize));
-                assertEquals(secondBulkRequestSize, replicaWriteLimits.getWriteBytes());
+                assertThat(primaryWriteLimits.getCurrentCoordinatingAndPrimaryBytes(), greaterThan(bulkShardRequestSize));
+                assertEquals(secondBulkRequestSize, replicaWriteLimits.getCurrentCoordinatingAndPrimaryBytes());
             }
-            assertEquals(bulkRequestSize, coordinatingWriteLimits.getWriteBytes());
-            assertBusy(() -> assertThat(replicaWriteLimits.getReplicaWriteBytes(),
+            assertEquals(bulkRequestSize, coordinatingWriteLimits.getCurrentCoordinatingAndPrimaryBytes());
+            assertBusy(() -> assertThat(replicaWriteLimits.getCurrentReplicaBytes(),
                 greaterThan(bulkShardRequestSize + secondBulkShardRequestSize)));
 
             replicaRelease.close();
@@ -180,12 +183,12 @@ public class WriteMemoryLimitsIT extends ESIntegTestCase {
             successFuture.actionGet();
             secondFuture.actionGet();
 
-            assertEquals(0, primaryWriteLimits.getWriteBytes());
-            assertEquals(0, primaryWriteLimits.getReplicaWriteBytes());
-            assertEquals(0, replicaWriteLimits.getWriteBytes());
-            assertEquals(0, replicaWriteLimits.getReplicaWriteBytes());
-            assertEquals(0, coordinatingWriteLimits.getWriteBytes());
-            assertEquals(0, coordinatingWriteLimits.getReplicaWriteBytes());
+            assertEquals(0, primaryWriteLimits.getCurrentCoordinatingAndPrimaryBytes());
+            assertEquals(0, primaryWriteLimits.getCurrentReplicaBytes());
+            assertEquals(0, replicaWriteLimits.getCurrentCoordinatingAndPrimaryBytes());
+            assertEquals(0, replicaWriteLimits.getCurrentReplicaBytes());
+            assertEquals(0, coordinatingWriteLimits.getCurrentCoordinatingAndPrimaryBytes());
+            assertEquals(0, coordinatingWriteLimits.getCurrentReplicaBytes());
         } finally {
             if (replicationSendPointReached.getCount() > 0) {
                 replicationSendPointReached.countDown();
@@ -212,8 +215,8 @@ public class WriteMemoryLimitsIT extends ESIntegTestCase {
 
         final long bulkRequestSize = bulkRequest.ramBytesUsed();
         final long bulkShardRequestSize = totalRequestSize;
-        restartNodesWithSettings(Settings.builder().put(WriteMemoryLimits.MAX_INDEXING_BYTES.getKey(),
-            (long)(bulkShardRequestSize * 1.5) + "B").build());
+        restartNodesWithSettings(Settings.builder().put(IndexingPressure.MAX_INDEXING_BYTES.getKey(),
+            (long) (bulkShardRequestSize * 1.5) + "B").build());
 
         assertAcked(prepareCreate(INDEX_NAME, Settings.builder()
             .put(IndexMetadata.SETTING_NUMBER_OF_SHARDS, 1)
@@ -229,17 +232,17 @@ public class WriteMemoryLimitsIT extends ESIntegTestCase {
         try (Releasable replicaRelease = blockReplicas(replicaThreadPool)) {
             final ActionFuture<BulkResponse> successFuture = client(coordinatingOnlyNode).bulk(bulkRequest);
 
-            WriteMemoryLimits primaryWriteLimits = internalCluster().getInstance(WriteMemoryLimits.class, primaryName);
-            WriteMemoryLimits replicaWriteLimits = internalCluster().getInstance(WriteMemoryLimits.class, replicaName);
-            WriteMemoryLimits coordinatingWriteLimits = internalCluster().getInstance(WriteMemoryLimits.class, coordinatingOnlyNode);
+            IndexingPressure primaryWriteLimits = internalCluster().getInstance(IndexingPressure.class, primaryName);
+            IndexingPressure replicaWriteLimits = internalCluster().getInstance(IndexingPressure.class, replicaName);
+            IndexingPressure coordinatingWriteLimits = internalCluster().getInstance(IndexingPressure.class, coordinatingOnlyNode);
 
             assertBusy(() -> {
-                assertThat(primaryWriteLimits.getWriteBytes(), greaterThan(bulkShardRequestSize));
-                assertEquals(0, primaryWriteLimits.getReplicaWriteBytes());
-                assertEquals(0, replicaWriteLimits.getWriteBytes());
-                assertThat(replicaWriteLimits.getReplicaWriteBytes(), greaterThan(bulkShardRequestSize));
-                assertEquals(bulkRequestSize, coordinatingWriteLimits.getWriteBytes());
-                assertEquals(0, coordinatingWriteLimits.getReplicaWriteBytes());
+                assertThat(primaryWriteLimits.getCurrentCoordinatingAndPrimaryBytes(), greaterThan(bulkShardRequestSize));
+                assertEquals(0, primaryWriteLimits.getCurrentReplicaBytes());
+                assertEquals(0, replicaWriteLimits.getCurrentCoordinatingAndPrimaryBytes());
+                assertThat(replicaWriteLimits.getCurrentReplicaBytes(), greaterThan(bulkShardRequestSize));
+                assertEquals(bulkRequestSize, coordinatingWriteLimits.getCurrentCoordinatingAndPrimaryBytes());
+                assertEquals(0, coordinatingWriteLimits.getCurrentReplicaBytes());
             });
 
             expectThrows(EsRejectedExecutionException.class, () -> {
@@ -256,12 +259,12 @@ public class WriteMemoryLimitsIT extends ESIntegTestCase {
 
             successFuture.actionGet();
 
-            assertEquals(0, primaryWriteLimits.getWriteBytes());
-            assertEquals(0, primaryWriteLimits.getReplicaWriteBytes());
-            assertEquals(0, replicaWriteLimits.getWriteBytes());
-            assertEquals(0, replicaWriteLimits.getReplicaWriteBytes());
-            assertEquals(0, coordinatingWriteLimits.getWriteBytes());
-            assertEquals(0, coordinatingWriteLimits.getReplicaWriteBytes());
+            assertEquals(0, primaryWriteLimits.getCurrentCoordinatingAndPrimaryBytes());
+            assertEquals(0, primaryWriteLimits.getCurrentReplicaBytes());
+            assertEquals(0, replicaWriteLimits.getCurrentCoordinatingAndPrimaryBytes());
+            assertEquals(0, replicaWriteLimits.getCurrentReplicaBytes());
+            assertEquals(0, coordinatingWriteLimits.getCurrentCoordinatingAndPrimaryBytes());
+            assertEquals(0, coordinatingWriteLimits.getCurrentReplicaBytes());
         }
     }
 
@@ -276,7 +279,7 @@ public class WriteMemoryLimitsIT extends ESIntegTestCase {
             bulkRequest.add(request);
         }
         final long bulkShardRequestSize = totalRequestSize;
-        restartNodesWithSettings(Settings.builder().put(WriteMemoryLimits.MAX_INDEXING_BYTES.getKey(),
+        restartNodesWithSettings(Settings.builder().put(IndexingPressure.MAX_INDEXING_BYTES.getKey(),
             (long)(bulkShardRequestSize * 1.5) + "B").build());
 
         assertAcked(prepareCreate(INDEX_NAME, Settings.builder()
@@ -293,17 +296,17 @@ public class WriteMemoryLimitsIT extends ESIntegTestCase {
         try (Releasable replicaRelease = blockReplicas(replicaThreadPool)) {
             final ActionFuture<BulkResponse> successFuture = client(primaryName).bulk(bulkRequest);
 
-            WriteMemoryLimits primaryWriteLimits = internalCluster().getInstance(WriteMemoryLimits.class, primaryName);
-            WriteMemoryLimits replicaWriteLimits = internalCluster().getInstance(WriteMemoryLimits.class, replicaName);
-            WriteMemoryLimits coordinatingWriteLimits = internalCluster().getInstance(WriteMemoryLimits.class, coordinatingOnlyNode);
+            IndexingPressure primaryWriteLimits = internalCluster().getInstance(IndexingPressure.class, primaryName);
+            IndexingPressure replicaWriteLimits = internalCluster().getInstance(IndexingPressure.class, replicaName);
+            IndexingPressure coordinatingWriteLimits = internalCluster().getInstance(IndexingPressure.class, coordinatingOnlyNode);
 
             assertBusy(() -> {
-                assertThat(primaryWriteLimits.getWriteBytes(), greaterThan(bulkShardRequestSize));
-                assertEquals(0, primaryWriteLimits.getReplicaWriteBytes());
-                assertEquals(0, replicaWriteLimits.getWriteBytes());
-                assertThat(replicaWriteLimits.getReplicaWriteBytes(), greaterThan(bulkShardRequestSize));
-                assertEquals(0, coordinatingWriteLimits.getWriteBytes());
-                assertEquals(0, coordinatingWriteLimits.getReplicaWriteBytes());
+                assertThat(primaryWriteLimits.getCurrentCoordinatingAndPrimaryBytes(), greaterThan(bulkShardRequestSize));
+                assertEquals(0, primaryWriteLimits.getCurrentReplicaBytes());
+                assertEquals(0, replicaWriteLimits.getCurrentCoordinatingAndPrimaryBytes());
+                assertThat(replicaWriteLimits.getCurrentReplicaBytes(), greaterThan(bulkShardRequestSize));
+                assertEquals(0, coordinatingWriteLimits.getCurrentCoordinatingAndPrimaryBytes());
+                assertEquals(0, coordinatingWriteLimits.getCurrentReplicaBytes());
             });
 
             BulkResponse responses = client(coordinatingOnlyNode).bulk(bulkRequest).actionGet();
@@ -314,17 +317,17 @@ public class WriteMemoryLimitsIT extends ESIntegTestCase {
 
             successFuture.actionGet();
 
-            assertEquals(0, primaryWriteLimits.getWriteBytes());
-            assertEquals(0, primaryWriteLimits.getReplicaWriteBytes());
-            assertEquals(0, replicaWriteLimits.getWriteBytes());
-            assertEquals(0, replicaWriteLimits.getReplicaWriteBytes());
-            assertEquals(0, coordinatingWriteLimits.getWriteBytes());
-            assertEquals(0, coordinatingWriteLimits.getReplicaWriteBytes());
+            assertEquals(0, primaryWriteLimits.getCurrentCoordinatingAndPrimaryBytes());
+            assertEquals(0, primaryWriteLimits.getCurrentReplicaBytes());
+            assertEquals(0, replicaWriteLimits.getCurrentCoordinatingAndPrimaryBytes());
+            assertEquals(0, replicaWriteLimits.getCurrentReplicaBytes());
+            assertEquals(0, coordinatingWriteLimits.getCurrentCoordinatingAndPrimaryBytes());
+            assertEquals(0, coordinatingWriteLimits.getCurrentReplicaBytes());
         }
     }
 
     public void testWritesWillSucceedIfBelowThreshold() throws Exception {
-        restartNodesWithSettings(Settings.builder().put(WriteMemoryLimits.MAX_INDEXING_BYTES.getKey(), "1MB").build());
+        restartNodesWithSettings(Settings.builder().put(IndexingPressure.MAX_INDEXING_BYTES.getKey(), "1MB").build());
         assertAcked(prepareCreate(INDEX_NAME, Settings.builder()
             .put(IndexMetadata.SETTING_NUMBER_OF_SHARDS, 1)
             .put(IndexMetadata.SETTING_NUMBER_OF_REPLICAS, 1)));

+ 26 - 1
server/src/main/java/org/elasticsearch/action/admin/cluster/node/stats/NodeStats.java

@@ -19,6 +19,7 @@
 
 package org.elasticsearch.action.admin.cluster.node.stats;
 
+import org.elasticsearch.Version;
 import org.elasticsearch.action.support.nodes.BaseNodeResponse;
 import org.elasticsearch.cluster.node.DiscoveryNode;
 import org.elasticsearch.cluster.node.DiscoveryNodeRole;
@@ -29,6 +30,7 @@ import org.elasticsearch.common.xcontent.ToXContentFragment;
 import org.elasticsearch.common.xcontent.XContentBuilder;
 import org.elasticsearch.discovery.DiscoveryStats;
 import org.elasticsearch.http.HttpStats;
+import org.elasticsearch.index.stats.IndexingPressureStats;
 import org.elasticsearch.indices.NodeIndicesStats;
 import org.elasticsearch.indices.breaker.AllCircuitBreakerStats;
 import org.elasticsearch.ingest.IngestStats;
@@ -90,6 +92,9 @@ public class NodeStats extends BaseNodeResponse implements ToXContentFragment {
     @Nullable
     private AdaptiveSelectionStats adaptiveSelectionStats;
 
+    @Nullable
+    private IndexingPressureStats indexingPressureStats;
+
     public NodeStats(StreamInput in) throws IOException {
         super(in);
         timestamp = in.readVLong();
@@ -108,6 +113,12 @@ public class NodeStats extends BaseNodeResponse implements ToXContentFragment {
         discoveryStats = in.readOptionalWriteable(DiscoveryStats::new);
         ingestStats = in.readOptionalWriteable(IngestStats::new);
         adaptiveSelectionStats = in.readOptionalWriteable(AdaptiveSelectionStats::new);
+        // TODO: Change after backport
+        if (in.getVersion().onOrAfter(Version.V_8_0_0)) {
+            indexingPressureStats = in.readOptionalWriteable(IndexingPressureStats::new);
+        } else {
+            indexingPressureStats = null;
+        }
     }
 
     public NodeStats(DiscoveryNode node, long timestamp, @Nullable NodeIndicesStats indices,
@@ -117,7 +128,8 @@ public class NodeStats extends BaseNodeResponse implements ToXContentFragment {
                      @Nullable ScriptStats scriptStats,
                      @Nullable DiscoveryStats discoveryStats,
                      @Nullable IngestStats ingestStats,
-                     @Nullable AdaptiveSelectionStats adaptiveSelectionStats) {
+                     @Nullable AdaptiveSelectionStats adaptiveSelectionStats,
+                     @Nullable IndexingPressureStats indexingPressureStats) {
         super(node);
         this.timestamp = timestamp;
         this.indices = indices;
@@ -133,6 +145,7 @@ public class NodeStats extends BaseNodeResponse implements ToXContentFragment {
         this.discoveryStats = discoveryStats;
         this.ingestStats = ingestStats;
         this.adaptiveSelectionStats = adaptiveSelectionStats;
+        this.indexingPressureStats = indexingPressureStats;
     }
 
     public long getTimestamp() {
@@ -227,6 +240,11 @@ public class NodeStats extends BaseNodeResponse implements ToXContentFragment {
         return adaptiveSelectionStats;
     }
 
+    @Nullable
+    public IndexingPressureStats getIndexingPressureStats() {
+        return indexingPressureStats;
+    }
+
     @Override
     public void writeTo(StreamOutput out) throws IOException {
         super.writeTo(out);
@@ -249,6 +267,10 @@ public class NodeStats extends BaseNodeResponse implements ToXContentFragment {
         out.writeOptionalWriteable(discoveryStats);
         out.writeOptionalWriteable(ingestStats);
         out.writeOptionalWriteable(adaptiveSelectionStats);
+        // TODO: Change after backport
+        if (out.getVersion().onOrAfter(Version.V_8_0_0)) {
+            out.writeOptionalWriteable(indexingPressureStats);
+        }
     }
 
     @Override
@@ -312,6 +334,9 @@ public class NodeStats extends BaseNodeResponse implements ToXContentFragment {
         if (getAdaptiveSelectionStats() != null) {
             getAdaptiveSelectionStats().toXContent(builder, params);
         }
+        if (getIndexingPressureStats() != null) {
+            getIndexingPressureStats().toXContent(builder, params);
+        }
         return builder;
     }
 }

+ 2 - 1
server/src/main/java/org/elasticsearch/action/admin/cluster/node/stats/NodesStatsRequest.java

@@ -220,7 +220,8 @@ public class NodesStatsRequest extends BaseNodesRequest<NodesStatsRequest> {
         DISCOVERY("discovery"),
         INGEST("ingest"),
         ADAPTIVE_SELECTION("adaptive_selection"),
-        SCRIPT_CACHE("script_cache");
+        SCRIPT_CACHE("script_cache"),
+        INDEXING_PRESSURE("indexing_pressure"),;
 
         private String metricName;
 

+ 5 - 0
server/src/main/java/org/elasticsearch/action/admin/cluster/node/stats/NodesStatsRequestBuilder.java

@@ -157,6 +157,11 @@ public class NodesStatsRequestBuilder
         return this;
     }
 
+    public NodesStatsRequestBuilder setIndexingPressure(boolean indexingPressure) {
+        addOrRemoveMetric(indexingPressure, NodesStatsRequest.Metric.INDEXING_PRESSURE);
+        return this;
+    }
+
     /**
      * Helper method for adding metrics to a request
      */

+ 2 - 1
server/src/main/java/org/elasticsearch/action/admin/cluster/node/stats/TransportNodesStatsAction.java

@@ -84,7 +84,8 @@ public class TransportNodesStatsAction extends TransportNodesAction<NodesStatsRe
             NodesStatsRequest.Metric.DISCOVERY.containedIn(metrics),
             NodesStatsRequest.Metric.INGEST.containedIn(metrics),
             NodesStatsRequest.Metric.ADAPTIVE_SELECTION.containedIn(metrics),
-            NodesStatsRequest.Metric.SCRIPT_CACHE.containedIn(metrics));
+            NodesStatsRequest.Metric.SCRIPT_CACHE.containedIn(metrics),
+            NodesStatsRequest.Metric.INDEXING_PRESSURE.containedIn(metrics));
     }
 
     public static class NodeStatsRequest extends TransportRequest {

+ 1 - 1
server/src/main/java/org/elasticsearch/action/admin/cluster/stats/TransportClusterStatsAction.java

@@ -98,7 +98,7 @@ public class TransportClusterStatsAction extends TransportNodesAction<ClusterSta
     protected ClusterStatsNodeResponse nodeOperation(ClusterStatsNodeRequest nodeRequest, Task task) {
         NodeInfo nodeInfo = nodeService.info(true, true, false, true, false, true, false, true, false, false);
         NodeStats nodeStats = nodeService.stats(CommonStatsFlags.NONE,
-                true, true, true, false, true, false, false, false, false, false, true, false, false);
+                true, true, true, false, true, false, false, false, false, false, true, false, false, false);
         List<ShardStats> shardsStats = new ArrayList<>();
         for (IndexService indexService : indicesService) {
             for (IndexShard indexShard : indexService) {

+ 7 - 6
server/src/main/java/org/elasticsearch/action/bulk/TransportBulkAction.java

@@ -68,6 +68,7 @@ import org.elasticsearch.index.Index;
 import org.elasticsearch.index.IndexNotFoundException;
 import org.elasticsearch.index.IndexSettings;
 import org.elasticsearch.index.VersionType;
+import org.elasticsearch.index.IndexingPressure;
 import org.elasticsearch.index.seqno.SequenceNumbers;
 import org.elasticsearch.index.shard.ShardId;
 import org.elasticsearch.indices.IndexClosedException;
@@ -112,21 +113,21 @@ public class TransportBulkAction extends HandledTransportAction<BulkRequest, Bul
     private final NodeClient client;
     private final IndexNameExpressionResolver indexNameExpressionResolver;
     private static final String DROPPED_ITEM_WITH_AUTO_GENERATED_ID = "auto-generated";
-    private final WriteMemoryLimits writeMemoryLimits;
+    private final IndexingPressure indexingPressure;
 
     @Inject
     public TransportBulkAction(ThreadPool threadPool, TransportService transportService,
                                ClusterService clusterService, IngestService ingestService,
                                NodeClient client, ActionFilters actionFilters, IndexNameExpressionResolver indexNameExpressionResolver,
-                               AutoCreateIndex autoCreateIndex, WriteMemoryLimits writeMemoryLimits) {
+                               AutoCreateIndex autoCreateIndex, IndexingPressure indexingPressure) {
         this(threadPool, transportService, clusterService, ingestService, client, actionFilters,
-            indexNameExpressionResolver, autoCreateIndex, writeMemoryLimits, System::nanoTime);
+            indexNameExpressionResolver, autoCreateIndex, indexingPressure, System::nanoTime);
     }
 
     public TransportBulkAction(ThreadPool threadPool, TransportService transportService,
                                ClusterService clusterService, IngestService ingestService,
                                NodeClient client, ActionFilters actionFilters, IndexNameExpressionResolver indexNameExpressionResolver,
-                               AutoCreateIndex autoCreateIndex, WriteMemoryLimits writeMemoryLimits, LongSupplier relativeTimeProvider) {
+                               AutoCreateIndex autoCreateIndex, IndexingPressure indexingPressure, LongSupplier relativeTimeProvider) {
         super(BulkAction.NAME, transportService, actionFilters, BulkRequest::new, ThreadPool.Names.SAME);
         Objects.requireNonNull(relativeTimeProvider);
         this.threadPool = threadPool;
@@ -137,7 +138,7 @@ public class TransportBulkAction extends HandledTransportAction<BulkRequest, Bul
         this.ingestForwarder = new IngestActionForwarder(transportService);
         this.client = client;
         this.indexNameExpressionResolver = indexNameExpressionResolver;
-        this.writeMemoryLimits = writeMemoryLimits;
+        this.indexingPressure = indexingPressure;
         clusterService.addStateApplier(this.ingestForwarder);
     }
 
@@ -162,7 +163,7 @@ public class TransportBulkAction extends HandledTransportAction<BulkRequest, Bul
     @Override
     protected void doExecute(Task task, BulkRequest bulkRequest, ActionListener<BulkResponse> listener) {
         long indexingBytes = bulkRequest.ramBytesUsed();
-        final Releasable releasable = writeMemoryLimits.markWriteOperationStarted(indexingBytes);
+        final Releasable releasable = indexingPressure.markIndexingOperationStarted(indexingBytes);
         final ActionListener<BulkResponse> releasingListener = ActionListener.runBefore(listener, releasable::close);
         try {
             doInternalExecute(task, bulkRequest, releasingListener);

+ 3 - 2
server/src/main/java/org/elasticsearch/action/bulk/TransportShardBulkAction.java

@@ -56,6 +56,7 @@ import org.elasticsearch.common.unit.TimeValue;
 import org.elasticsearch.common.xcontent.ToXContent;
 import org.elasticsearch.common.xcontent.XContentHelper;
 import org.elasticsearch.common.xcontent.XContentType;
+import org.elasticsearch.index.IndexingPressure;
 import org.elasticsearch.index.engine.Engine;
 import org.elasticsearch.index.engine.VersionConflictEngineException;
 import org.elasticsearch.index.get.GetResult;
@@ -93,9 +94,9 @@ public class TransportShardBulkAction extends TransportWriteAction<BulkShardRequ
     public TransportShardBulkAction(Settings settings, TransportService transportService, ClusterService clusterService,
                                     IndicesService indicesService, ThreadPool threadPool, ShardStateAction shardStateAction,
                                     MappingUpdatedAction mappingUpdatedAction, UpdateHelper updateHelper, ActionFilters actionFilters,
-                                    WriteMemoryLimits writeMemoryLimits) {
+                                    IndexingPressure indexingPressure) {
         super(settings, ACTION_NAME, transportService, clusterService, indicesService, threadPool, shardStateAction, actionFilters,
-            BulkShardRequest::new, BulkShardRequest::new, ThreadPool.Names.WRITE, false, writeMemoryLimits);
+            BulkShardRequest::new, BulkShardRequest::new, ThreadPool.Names.WRITE, false, indexingPressure);
         this.updateHelper = updateHelper;
         this.mappingUpdatedAction = mappingUpdatedAction;
     }

+ 0 - 92
server/src/main/java/org/elasticsearch/action/bulk/WriteMemoryLimits.java

@@ -1,92 +0,0 @@
-/*
- * Licensed to Elasticsearch under one or more contributor
- * license agreements. See the NOTICE file distributed with
- * this work for additional information regarding copyright
- * ownership. Elasticsearch licenses this file to you under
- * the Apache License, Version 2.0 (the "License"); you may
- * not use this file except in compliance with the License.
- * You may obtain a copy of the License at
- *
- *     http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing,
- * software distributed under the License is distributed on an
- * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
- * KIND, either express or implied.  See the License for the
- * specific language governing permissions and limitations
- * under the License.
- */
-
-package org.elasticsearch.action.bulk;
-
-import org.elasticsearch.common.lease.Releasable;
-import org.elasticsearch.common.settings.ClusterSettings;
-import org.elasticsearch.common.settings.Setting;
-import org.elasticsearch.common.settings.Settings;
-import org.elasticsearch.common.unit.ByteSizeValue;
-import org.elasticsearch.common.util.concurrent.EsRejectedExecutionException;
-
-import java.util.concurrent.atomic.AtomicLong;
-
-public class WriteMemoryLimits {
-
-    public static final Setting<ByteSizeValue> MAX_INDEXING_BYTES =
-        Setting.memorySizeSetting("indexing_limits.memory.limit", "10%", Setting.Property.NodeScope);
-
-    private final AtomicLong writeBytes = new AtomicLong(0);
-    private final AtomicLong replicaWriteBytes = new AtomicLong(0);
-    private final long writeLimits;
-
-    public WriteMemoryLimits(Settings settings) {
-        this.writeLimits = MAX_INDEXING_BYTES.get(settings).getBytes();
-    }
-
-    public WriteMemoryLimits(Settings settings, ClusterSettings clusterSettings) {
-        this.writeLimits = MAX_INDEXING_BYTES.get(settings).getBytes();
-    }
-
-    public Releasable markWriteOperationStarted(long bytes) {
-        return markWriteOperationStarted(bytes, false);
-    }
-
-    public Releasable markWriteOperationStarted(long bytes, boolean forceExecution) {
-        long currentWriteLimits = this.writeLimits;
-        long writeBytes = this.writeBytes.addAndGet(bytes);
-        long replicaWriteBytes = this.replicaWriteBytes.get();
-        long totalBytes = writeBytes + replicaWriteBytes;
-        if (forceExecution == false && totalBytes > currentWriteLimits) {
-            long bytesWithoutOperation = writeBytes - bytes;
-            long totalBytesWithoutOperation = totalBytes - bytes;
-            this.writeBytes.getAndAdd(-bytes);
-            throw new EsRejectedExecutionException("rejected execution of write operation [" +
-                "write_bytes=" + bytesWithoutOperation + ", " +
-                "replica_write_bytes=" + replicaWriteBytes + ", " +
-                "total_write_bytes=" + totalBytesWithoutOperation + ", " +
-                "current_operation_bytes=" + bytes + ", " +
-                "max_write_bytes=" + currentWriteLimits + "]", false);
-        }
-        return () -> this.writeBytes.getAndAdd(-bytes);
-    }
-
-    public long getWriteBytes() {
-        return writeBytes.get();
-    }
-
-    public Releasable markReplicaWriteStarted(long bytes, boolean forceExecution) {
-        long currentReplicaWriteLimits = (long) (this.writeLimits * 1.5);
-        long replicaWriteBytes = this.replicaWriteBytes.getAndAdd(bytes);
-        if (forceExecution == false && replicaWriteBytes > currentReplicaWriteLimits) {
-            long replicaBytesWithoutOperation = replicaWriteBytes - bytes;
-            this.replicaWriteBytes.getAndAdd(-bytes);
-            throw new EsRejectedExecutionException("rejected execution of replica write operation [" +
-                "replica_write_bytes=" + replicaBytesWithoutOperation + ", " +
-                "current_replica_operation_bytes=" + bytes + ", " +
-                "max_replica_write_bytes=" + currentReplicaWriteLimits + "]", false);
-        }
-        return () -> this.replicaWriteBytes.getAndAdd(-bytes);
-    }
-
-    public long getReplicaWriteBytes() {
-        return replicaWriteBytes.get();
-    }
-}

+ 3 - 3
server/src/main/java/org/elasticsearch/action/resync/TransportResyncReplicationAction.java

@@ -20,7 +20,7 @@ package org.elasticsearch.action.resync;
 
 import org.apache.logging.log4j.message.ParameterizedMessage;
 import org.elasticsearch.action.ActionListener;
-import org.elasticsearch.action.bulk.WriteMemoryLimits;
+import org.elasticsearch.index.IndexingPressure;
 import org.elasticsearch.action.support.ActionFilters;
 import org.elasticsearch.action.support.replication.ReplicationOperation;
 import org.elasticsearch.action.support.replication.ReplicationResponse;
@@ -57,11 +57,11 @@ public class TransportResyncReplicationAction extends TransportWriteAction<Resyn
     public TransportResyncReplicationAction(Settings settings, TransportService transportService,
                                             ClusterService clusterService, IndicesService indicesService, ThreadPool threadPool,
                                             ShardStateAction shardStateAction, ActionFilters actionFilters,
-                                            WriteMemoryLimits writeMemoryLimits) {
+                                            IndexingPressure indexingPressure) {
         super(settings, ACTION_NAME, transportService, clusterService, indicesService, threadPool, shardStateAction, actionFilters,
             ResyncReplicationRequest::new, ResyncReplicationRequest::new, ThreadPool.Names.WRITE,
             true, /* we should never reject resync because of thread pool capacity on primary */
-            writeMemoryLimits);
+            indexingPressure);
     }
 
     @Override

+ 7 - 7
server/src/main/java/org/elasticsearch/action/support/replication/TransportWriteAction.java

@@ -23,7 +23,7 @@ import org.apache.logging.log4j.Logger;
 import org.apache.logging.log4j.message.ParameterizedMessage;
 import org.elasticsearch.action.ActionListener;
 import org.elasticsearch.action.ActionRunnable;
-import org.elasticsearch.action.bulk.WriteMemoryLimits;
+import org.elasticsearch.index.IndexingPressure;
 import org.elasticsearch.action.support.ActionFilters;
 import org.elasticsearch.action.support.TransportActions;
 import org.elasticsearch.action.support.WriteRequest;
@@ -61,26 +61,26 @@ public abstract class TransportWriteAction<
         > extends TransportReplicationAction<Request, ReplicaRequest, Response> {
 
     private final boolean forceExecution;
-    private final WriteMemoryLimits writeMemoryLimits;
+    private final IndexingPressure indexingPressure;
     private final String executor;
 
     protected TransportWriteAction(Settings settings, String actionName, TransportService transportService,
                                    ClusterService clusterService, IndicesService indicesService, ThreadPool threadPool,
                                    ShardStateAction shardStateAction, ActionFilters actionFilters, Writeable.Reader<Request> request,
                                    Writeable.Reader<ReplicaRequest> replicaRequest, String executor, boolean forceExecutionOnPrimary,
-                                   WriteMemoryLimits writeMemoryLimits) {
+                                   IndexingPressure indexingPressure) {
         // We pass ThreadPool.Names.SAME to the super class as we control the dispatching to the
         // ThreadPool.Names.WRITE thread pool in this class.
         super(settings, actionName, transportService, clusterService, indicesService, threadPool, shardStateAction, actionFilters,
             request, replicaRequest, ThreadPool.Names.SAME, true, forceExecutionOnPrimary);
         this.executor = executor;
         this.forceExecution = forceExecutionOnPrimary;
-        this.writeMemoryLimits = writeMemoryLimits;
+        this.indexingPressure = indexingPressure;
     }
 
     @Override
     protected Releasable checkOperationLimits(Request request) {
-        return writeMemoryLimits.markWriteOperationStarted(primaryOperationSize(request), forceExecution);
+        return indexingPressure.markIndexingOperationStarted(primaryOperationSize(request), forceExecution);
     }
 
     @Override
@@ -90,7 +90,7 @@ public abstract class TransportWriteAction<
         if (rerouteWasLocal) {
             return () -> {};
         } else {
-            return writeMemoryLimits.markWriteOperationStarted(primaryOperationSize(request), forceExecution);
+            return indexingPressure.markIndexingOperationStarted(primaryOperationSize(request), forceExecution);
         }
     }
 
@@ -100,7 +100,7 @@ public abstract class TransportWriteAction<
 
     @Override
     protected Releasable checkReplicaLimits(ReplicaRequest request) {
-        return writeMemoryLimits.markReplicaWriteStarted(replicaOperationSize(request), forceExecution);
+        return indexingPressure.markReplicaOperationStarted(replicaOperationSize(request), forceExecution);
     }
 
     protected long replicaOperationSize(ReplicaRequest request) {

+ 2 - 2
server/src/main/java/org/elasticsearch/common/settings/ClusterSettings.java

@@ -21,7 +21,7 @@ package org.elasticsearch.common.settings;
 import org.apache.logging.log4j.LogManager;
 import org.elasticsearch.action.admin.cluster.configuration.TransportAddVotingConfigExclusionsAction;
 import org.elasticsearch.action.admin.indices.close.TransportCloseIndexAction;
-import org.elasticsearch.action.bulk.WriteMemoryLimits;
+import org.elasticsearch.index.IndexingPressure;
 import org.elasticsearch.action.search.TransportSearchAction;
 import org.elasticsearch.action.support.AutoCreateIndex;
 import org.elasticsearch.action.support.DestructiveOperations;
@@ -489,7 +489,7 @@ public final class ClusterSettings extends AbstractScopedSettings {
             FsHealthService.ENABLED_SETTING,
             FsHealthService.REFRESH_INTERVAL_SETTING,
             FsHealthService.SLOW_PATH_LOGGING_THRESHOLD_SETTING,
-            WriteMemoryLimits.MAX_INDEXING_BYTES);
+            IndexingPressure.MAX_INDEXING_BYTES);
 
     static List<SettingUpgrader<?>> BUILT_IN_SETTING_UPGRADERS = Collections.emptyList();
 

+ 111 - 0
server/src/main/java/org/elasticsearch/index/IndexingPressure.java

@@ -0,0 +1,111 @@
+/*
+ * Licensed to Elasticsearch under one or more contributor
+ * license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright
+ * ownership. Elasticsearch licenses this file to you under
+ * the Apache License, Version 2.0 (the "License"); you may
+ * not use this file except in compliance with the License.
+ * You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied.  See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+
+package org.elasticsearch.index;
+
+import org.elasticsearch.common.lease.Releasable;
+import org.elasticsearch.common.settings.Setting;
+import org.elasticsearch.common.settings.Settings;
+import org.elasticsearch.common.unit.ByteSizeValue;
+import org.elasticsearch.common.util.concurrent.EsRejectedExecutionException;
+import org.elasticsearch.index.stats.IndexingPressureStats;
+
+import java.util.concurrent.atomic.AtomicLong;
+
+public class IndexingPressure {
+
+    public static final Setting<ByteSizeValue> MAX_INDEXING_BYTES =
+        Setting.memorySizeSetting("indexing_pressure.memory.limit", "10%", Setting.Property.NodeScope);
+
+    private final AtomicLong currentCoordinatingAndPrimaryBytes = new AtomicLong(0);
+    private final AtomicLong currentReplicaBytes = new AtomicLong(0);
+    private final AtomicLong totalCoordinatingAndPrimaryBytes = new AtomicLong(0);
+    private final AtomicLong totalReplicaBytes = new AtomicLong(0);
+    private final AtomicLong coordinatingAndPrimaryRejections = new AtomicLong(0);
+    private final AtomicLong replicaRejections = new AtomicLong(0);
+
+    private final long primaryAndCoordinatingLimits;
+    private final long replicaLimits;
+
+    public IndexingPressure(Settings settings) {
+        this.primaryAndCoordinatingLimits = MAX_INDEXING_BYTES.get(settings).getBytes();
+        this.replicaLimits = (long) (this.primaryAndCoordinatingLimits * 1.5);
+    }
+
+    public Releasable markIndexingOperationStarted(long bytes) {
+        return markIndexingOperationStarted(bytes, false);
+    }
+
+    public Releasable markIndexingOperationStarted(long bytes, boolean forceExecution) {
+        long writeBytes = this.currentCoordinatingAndPrimaryBytes.addAndGet(bytes);
+        long replicaWriteBytes = this.currentReplicaBytes.get();
+        long totalBytes = writeBytes + replicaWriteBytes;
+        if (forceExecution == false && totalBytes > primaryAndCoordinatingLimits) {
+            long bytesWithoutOperation = writeBytes - bytes;
+            long totalBytesWithoutOperation = totalBytes - bytes;
+            this.currentCoordinatingAndPrimaryBytes.getAndAdd(-bytes);
+            this.coordinatingAndPrimaryRejections.getAndIncrement();
+            throw new EsRejectedExecutionException("rejected execution of operation [" +
+                "coordinating_and_primary_bytes=" + bytesWithoutOperation + ", " +
+                "replica_bytes=" + replicaWriteBytes + ", " +
+                "all_bytes=" + totalBytesWithoutOperation + ", " +
+                "operation_bytes=" + bytes + ", " +
+                "max_coordinating_and_primary_bytes=" + primaryAndCoordinatingLimits + "]", false);
+        }
+        totalCoordinatingAndPrimaryBytes.getAndAdd(bytes);
+        return () -> this.currentCoordinatingAndPrimaryBytes.getAndAdd(-bytes);
+    }
+
+    public Releasable markReplicaOperationStarted(long bytes, boolean forceExecution) {
+        long replicaWriteBytes = this.currentReplicaBytes.getAndAdd(bytes);
+        if (forceExecution == false && replicaWriteBytes > replicaLimits) {
+            long replicaBytesWithoutOperation = replicaWriteBytes - bytes;
+            this.currentReplicaBytes.getAndAdd(-bytes);
+            this.replicaRejections.getAndIncrement();
+            throw new EsRejectedExecutionException("rejected execution of replica operation [" +
+                "replica_bytes=" + replicaBytesWithoutOperation + ", " +
+                "replica_operation_bytes=" + bytes + ", " +
+                "max_replica_bytes=" + replicaLimits + "]", false);
+        }
+        totalReplicaBytes.getAndAdd(bytes);
+        return () -> this.currentReplicaBytes.getAndAdd(-bytes);
+    }
+
+    public long getCurrentCoordinatingAndPrimaryBytes() {
+        return currentCoordinatingAndPrimaryBytes.get();
+    }
+
+    public long getCurrentReplicaBytes() {
+        return currentReplicaBytes.get();
+    }
+
+    public long getTotalCoordinatingAndPrimaryBytes() {
+        return totalCoordinatingAndPrimaryBytes.get();
+    }
+
+    public long getTotalReplicaBytes() {
+        return totalReplicaBytes.get();
+    }
+
+    public IndexingPressureStats stats() {
+        return new IndexingPressureStats(totalCoordinatingAndPrimaryBytes.get(), totalReplicaBytes.get(),
+            currentCoordinatingAndPrimaryBytes.get(), currentReplicaBytes.get(), coordinatingAndPrimaryRejections.get(),
+            replicaRejections.get());
+    }
+}

+ 3 - 3
server/src/main/java/org/elasticsearch/index/seqno/RetentionLeaseSyncAction.java

@@ -25,7 +25,7 @@ import org.apache.logging.log4j.message.ParameterizedMessage;
 import org.apache.lucene.store.AlreadyClosedException;
 import org.elasticsearch.ExceptionsHelper;
 import org.elasticsearch.action.ActionListener;
-import org.elasticsearch.action.bulk.WriteMemoryLimits;
+import org.elasticsearch.index.IndexingPressure;
 import org.elasticsearch.action.support.ActionFilters;
 import org.elasticsearch.action.support.ActiveShardCount;
 import org.elasticsearch.action.support.WriteResponse;
@@ -80,7 +80,7 @@ public class RetentionLeaseSyncAction extends
             final ThreadPool threadPool,
             final ShardStateAction shardStateAction,
             final ActionFilters actionFilters,
-            final WriteMemoryLimits writeMemoryLimits) {
+            final IndexingPressure indexingPressure) {
         super(
                 settings,
                 ACTION_NAME,
@@ -92,7 +92,7 @@ public class RetentionLeaseSyncAction extends
                 actionFilters,
                 RetentionLeaseSyncAction.Request::new,
                 RetentionLeaseSyncAction.Request::new,
-                ThreadPool.Names.MANAGEMENT, false, writeMemoryLimits);
+                ThreadPool.Names.MANAGEMENT, false, indexingPressure);
     }
 
     @Override

+ 85 - 0
server/src/main/java/org/elasticsearch/index/stats/IndexingPressureStats.java

@@ -0,0 +1,85 @@
+/*
+ * Licensed to Elasticsearch under one or more contributor
+ * license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright
+ * ownership. Elasticsearch licenses this file to you under
+ * the Apache License, Version 2.0 (the "License"); you may
+ * not use this file except in compliance with the License.
+ * You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied.  See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+
+package org.elasticsearch.index.stats;
+
+import org.elasticsearch.common.io.stream.StreamInput;
+import org.elasticsearch.common.io.stream.StreamOutput;
+import org.elasticsearch.common.io.stream.Writeable;
+import org.elasticsearch.common.xcontent.ToXContentFragment;
+import org.elasticsearch.common.xcontent.XContentBuilder;
+
+import java.io.IOException;
+
+public class IndexingPressureStats implements Writeable, ToXContentFragment {
+
+    private final long totalCoordinatingAndPrimaryBytes;
+    private final long totalReplicaBytes;
+    private final long currentCoordinatingAndPrimaryBytes;
+    private final long currentReplicaBytes;
+    private final long coordinatingAndPrimaryRejections;
+    private final long replicaRejections;
+
+    public IndexingPressureStats(StreamInput in) throws IOException {
+        totalCoordinatingAndPrimaryBytes = in.readVLong();
+        totalReplicaBytes = in.readVLong();
+        currentCoordinatingAndPrimaryBytes = in.readVLong();
+        currentReplicaBytes = in.readVLong();
+        coordinatingAndPrimaryRejections = in.readVLong();
+        replicaRejections = in.readVLong();
+    }
+
+    public IndexingPressureStats(long totalCoordinatingAndPrimaryBytes, long totalReplicaBytes, long currentCoordinatingAndPrimaryBytes,
+                                 long currentReplicaBytes, long coordinatingAndPrimaryRejections, long replicaRejections) {
+        this.totalCoordinatingAndPrimaryBytes = totalCoordinatingAndPrimaryBytes;
+        this.totalReplicaBytes = totalReplicaBytes;
+        this.currentCoordinatingAndPrimaryBytes = currentCoordinatingAndPrimaryBytes;
+        this.currentReplicaBytes = currentReplicaBytes;
+        this.coordinatingAndPrimaryRejections = coordinatingAndPrimaryRejections;
+        this.replicaRejections = replicaRejections;
+    }
+
+    @Override
+    public void writeTo(StreamOutput out) throws IOException {
+        out.writeVLong(totalCoordinatingAndPrimaryBytes);
+        out.writeVLong(totalReplicaBytes);
+        out.writeVLong(currentCoordinatingAndPrimaryBytes);
+        out.writeVLong(currentReplicaBytes);
+        out.writeVLong(coordinatingAndPrimaryRejections);
+        out.writeVLong(replicaRejections);
+    }
+
+    @Override
+    public XContentBuilder toXContent(XContentBuilder builder, Params params) throws IOException {
+        builder.startObject("indexing_pressure");
+        builder.startObject("total");
+        builder.field("coordinating_and_primary_bytes", totalCoordinatingAndPrimaryBytes);
+        builder.field("replica_bytes", totalReplicaBytes);
+        builder.field("all_bytes", totalReplicaBytes + totalCoordinatingAndPrimaryBytes);
+        builder.field("coordinating_and_primary_memory_limit_rejections", coordinatingAndPrimaryRejections);
+        builder.field("replica_memory_limit_rejections", replicaRejections);
+        builder.endObject();
+        builder.startObject("current");
+        builder.field("coordinating_and_primary_bytes", currentCoordinatingAndPrimaryBytes);
+        builder.field("replica_bytes", currentReplicaBytes);
+        builder.field("all_bytes", currentCoordinatingAndPrimaryBytes + currentReplicaBytes);
+        builder.endObject();
+        return builder.endObject();
+    }
+}

+ 4 - 4
server/src/main/java/org/elasticsearch/node/Node.java

@@ -30,7 +30,7 @@ import org.elasticsearch.ElasticsearchTimeoutException;
 import org.elasticsearch.Version;
 import org.elasticsearch.action.ActionModule;
 import org.elasticsearch.action.ActionType;
-import org.elasticsearch.action.bulk.WriteMemoryLimits;
+import org.elasticsearch.index.IndexingPressure;
 import org.elasticsearch.action.search.SearchExecutionStatsCollector;
 import org.elasticsearch.action.search.SearchPhaseController;
 import org.elasticsearch.action.search.SearchTransportService;
@@ -538,6 +538,7 @@ public class Node implements Closeable {
             final SearchTransportService searchTransportService =  new SearchTransportService(transportService,
                 SearchExecutionStatsCollector.makeWrapper(responseCollectorService));
             final HttpServerTransport httpServerTransport = newHttpTransport(networkModule);
+            final IndexingPressure indexingLimits = new IndexingPressure(settings);
 
             final RecoverySettings recoverySettings = new RecoverySettings(settings, settingsModule.getClusterSettings());
             RepositoriesModule repositoriesModule = new RepositoriesModule(this.environment,
@@ -566,7 +567,7 @@ public class Node implements Closeable {
             this.nodeService = new NodeService(settings, threadPool, monitorService, discoveryModule.getDiscovery(),
                 transportService, indicesService, pluginsService, circuitBreakerService, scriptService,
                 httpServerTransport, ingestService, clusterService, settingsModule.getSettingsFilter(), responseCollectorService,
-                searchTransportService);
+                searchTransportService, indexingLimits);
 
             final SearchService searchService = newSearchService(clusterService, indicesService,
                 threadPool, scriptService, bigArrays, searchModule.getFetchPhase(),
@@ -584,7 +585,6 @@ public class Node implements Closeable {
                 new PersistentTasksClusterService(settings, registry, clusterService, threadPool);
             resourcesToClose.add(persistentTasksClusterService);
             final PersistentTasksService persistentTasksService = new PersistentTasksService(clusterService, threadPool, client);
-            final WriteMemoryLimits bulkIndexingLimits = new WriteMemoryLimits(settings);
 
             modules.add(b -> {
                     b.bind(Node.class).toInstance(this);
@@ -603,7 +603,7 @@ public class Node implements Closeable {
                     b.bind(ScriptService.class).toInstance(scriptService);
                     b.bind(AnalysisRegistry.class).toInstance(analysisModule.getAnalysisRegistry());
                     b.bind(IngestService.class).toInstance(ingestService);
-                    b.bind(WriteMemoryLimits.class).toInstance(bulkIndexingLimits);
+                    b.bind(IndexingPressure.class).toInstance(indexingLimits);
                     b.bind(UsageService.class).toInstance(usageService);
                     b.bind(AggregationUsageService.class).toInstance(searchModule.getValuesSourceRegistry().getUsageService());
                     b.bind(NamedWriteableRegistry.class).toInstance(namedWriteableRegistry);

+ 8 - 4
server/src/main/java/org/elasticsearch/node/NodeService.java

@@ -19,6 +19,7 @@
 
 package org.elasticsearch.node;
 
+import org.elasticsearch.index.IndexingPressure;
 import org.elasticsearch.core.internal.io.IOUtils;
 import org.elasticsearch.Build;
 import org.elasticsearch.Version;
@@ -59,6 +60,7 @@ public class NodeService implements Closeable {
     private final HttpServerTransport httpServerTransport;
     private final ResponseCollectorService responseCollectorService;
     private final SearchTransportService searchTransportService;
+    private final IndexingPressure indexingPressure;
 
     private final Discovery discovery;
 
@@ -67,7 +69,7 @@ public class NodeService implements Closeable {
                 CircuitBreakerService circuitBreakerService, ScriptService scriptService,
                 @Nullable HttpServerTransport httpServerTransport, IngestService ingestService, ClusterService clusterService,
                 SettingsFilter settingsFilter, ResponseCollectorService responseCollectorService,
-                SearchTransportService searchTransportService) {
+                SearchTransportService searchTransportService, IndexingPressure indexingPressure) {
         this.settings = settings;
         this.threadPool = threadPool;
         this.monitorService = monitorService;
@@ -82,6 +84,7 @@ public class NodeService implements Closeable {
         this.scriptService = scriptService;
         this.responseCollectorService = responseCollectorService;
         this.searchTransportService = searchTransportService;
+        this.indexingPressure = indexingPressure;
         clusterService.addStateApplier(ingestService);
     }
 
@@ -103,7 +106,8 @@ public class NodeService implements Closeable {
 
     public NodeStats stats(CommonStatsFlags indices, boolean os, boolean process, boolean jvm, boolean threadPool,
                            boolean fs, boolean transport, boolean http, boolean circuitBreaker,
-                           boolean script, boolean discoveryStats, boolean ingest, boolean adaptiveSelection, boolean scriptCache) {
+                           boolean script, boolean discoveryStats, boolean ingest, boolean adaptiveSelection, boolean scriptCache,
+                           boolean indexingPressure) {
         // for indices stats we want to include previous allocated shards stats as well (it will
         // only be applied to the sensible ones to use, like refresh/merge/flush/indexing stats)
         return new NodeStats(transportService.getLocalNode(), System.currentTimeMillis(),
@@ -119,8 +123,8 @@ public class NodeService implements Closeable {
                 script ? scriptService.stats() : null,
                 discoveryStats ? discovery.stats() : null,
                 ingest ? ingestService.stats() : null,
-                adaptiveSelection ? responseCollectorService.getAdaptiveStats(searchTransportService.getPendingSearchRequests()) : null
-        );
+                adaptiveSelection ? responseCollectorService.getAdaptiveStats(searchTransportService.getPendingSearchRequests()) : null,
+                indexingPressure ? this.indexingPressure.stats() : null);
     }
 
     public IngestService getIngestService() {

+ 1 - 1
server/src/test/java/org/elasticsearch/action/admin/cluster/node/stats/NodeStatsTests.java

@@ -528,7 +528,7 @@ public class NodeStatsTests extends ESTestCase {
         //TODO NodeIndicesStats are not tested here, way too complicated to create, also they need to be migrated to Writeable yet
         return new NodeStats(node, randomNonNegativeLong(), null, osStats, processStats, jvmStats, threadPoolStats,
                 fsInfo, transportStats, httpStats, allCircuitBreakerStats, scriptStats, discoveryStats,
-                ingestStats, adaptiveSelectionStats);
+                ingestStats, adaptiveSelectionStats, null);
     }
 
     private IngestStats.Stats getPipelineStats(List<IngestStats.PipelineStat> pipelineStats, String id) {

+ 2 - 1
server/src/test/java/org/elasticsearch/action/bulk/TransportBulkActionIndicesThatCannotBeCreatedTests.java

@@ -37,6 +37,7 @@ import org.elasticsearch.common.util.concurrent.AtomicArray;
 import org.elasticsearch.common.util.concurrent.EsExecutors;
 import org.elasticsearch.index.IndexNotFoundException;
 import org.elasticsearch.index.VersionType;
+import org.elasticsearch.index.IndexingPressure;
 import org.elasticsearch.tasks.Task;
 import org.elasticsearch.test.ESTestCase;
 import org.elasticsearch.test.VersionUtils;
@@ -122,7 +123,7 @@ public class TransportBulkActionIndicesThatCannotBeCreatedTests extends ESTestCa
         when(threadPool.executor(anyString())).thenReturn(direct);
         TransportBulkAction action = new TransportBulkAction(threadPool, mock(TransportService.class), clusterService,
                 null, null, mock(ActionFilters.class), null, null,
-            new WriteMemoryLimits(Settings.EMPTY)) {
+            new IndexingPressure(Settings.EMPTY)) {
             @Override
             void executeBulk(Task task, BulkRequest bulkRequest, long startTimeNanos, ActionListener<BulkResponse> listener,
                     AtomicArray<BulkItemResponse> responses, Map<String, IndexNotFoundException> indicesThatCannotBeCreated) {

+ 2 - 1
server/src/test/java/org/elasticsearch/action/bulk/TransportBulkActionIngestTests.java

@@ -52,6 +52,7 @@ import org.elasticsearch.common.util.concurrent.AtomicArray;
 import org.elasticsearch.common.util.concurrent.EsExecutors;
 import org.elasticsearch.index.IndexNotFoundException;
 import org.elasticsearch.index.IndexSettings;
+import org.elasticsearch.index.IndexingPressure;
 import org.elasticsearch.ingest.IngestService;
 import org.elasticsearch.tasks.Task;
 import org.elasticsearch.test.ESTestCase;
@@ -143,7 +144,7 @@ public class TransportBulkActionIngestTests extends ESTestCase {
                 new AutoCreateIndex(
                     SETTINGS, new ClusterSettings(SETTINGS, ClusterSettings.BUILT_IN_CLUSTER_SETTINGS),
                     new IndexNameExpressionResolver()
-                ), new WriteMemoryLimits(SETTINGS)
+                ), new IndexingPressure(SETTINGS)
             );
         }
         @Override

+ 2 - 1
server/src/test/java/org/elasticsearch/action/bulk/TransportBulkActionTests.java

@@ -45,6 +45,7 @@ import org.elasticsearch.common.unit.TimeValue;
 import org.elasticsearch.index.IndexNotFoundException;
 import org.elasticsearch.index.IndexSettings;
 import org.elasticsearch.index.VersionType;
+import org.elasticsearch.index.IndexingPressure;
 import org.elasticsearch.ingest.IngestService;
 import org.elasticsearch.test.ESTestCase;
 import org.elasticsearch.test.VersionUtils;
@@ -82,7 +83,7 @@ public class TransportBulkActionTests extends ESTestCase {
             super(TransportBulkActionTests.this.threadPool, transportService, clusterService, null,
                     null, new ActionFilters(Collections.emptySet()), new Resolver(),
                     new AutoCreateIndex(Settings.EMPTY, clusterService.getClusterSettings(), new Resolver()),
-                    new WriteMemoryLimits(Settings.EMPTY));
+                    new IndexingPressure(Settings.EMPTY));
         }
 
         @Override

+ 2 - 1
server/src/test/java/org/elasticsearch/action/bulk/TransportBulkActionTookTests.java

@@ -41,6 +41,7 @@ import org.elasticsearch.common.settings.Settings;
 import org.elasticsearch.common.util.concurrent.AtomicArray;
 import org.elasticsearch.common.xcontent.XContentType;
 import org.elasticsearch.index.IndexNotFoundException;
+import org.elasticsearch.index.IndexingPressure;
 import org.elasticsearch.tasks.Task;
 import org.elasticsearch.test.ESTestCase;
 import org.elasticsearch.test.VersionUtils;
@@ -233,7 +234,7 @@ public class TransportBulkActionTookTests extends ESTestCase {
                     actionFilters,
                     indexNameExpressionResolver,
                     autoCreateIndex,
-                    new WriteMemoryLimits(Settings.EMPTY),
+                    new IndexingPressure(Settings.EMPTY),
                     relativeTimeProvider);
         }
 

+ 2 - 2
server/src/test/java/org/elasticsearch/action/resync/TransportResyncReplicationActionTests.java

@@ -20,7 +20,7 @@ package org.elasticsearch.action.resync;
 
 import org.elasticsearch.Version;
 import org.elasticsearch.action.ActionListener;
-import org.elasticsearch.action.bulk.WriteMemoryLimits;
+import org.elasticsearch.index.IndexingPressure;
 import org.elasticsearch.action.support.ActionFilters;
 import org.elasticsearch.action.support.PlainActionFuture;
 import org.elasticsearch.cluster.ClusterState;
@@ -145,7 +145,7 @@ public class TransportResyncReplicationActionTests extends ESTestCase {
 
                 final TransportResyncReplicationAction action = new TransportResyncReplicationAction(Settings.EMPTY, transportService,
                     clusterService, indexServices, threadPool, shardStateAction, new ActionFilters(new HashSet<>()),
-                    new WriteMemoryLimits(Settings.EMPTY));
+                    new IndexingPressure(Settings.EMPTY));
 
                 assertThat(action.globalBlockLevel(), nullValue());
                 assertThat(action.indexBlockLevel(), nullValue());

+ 3 - 3
server/src/test/java/org/elasticsearch/action/support/replication/TransportWriteActionTests.java

@@ -21,7 +21,7 @@ package org.elasticsearch.action.support.replication;
 
 import org.elasticsearch.ElasticsearchException;
 import org.elasticsearch.action.ActionListener;
-import org.elasticsearch.action.bulk.WriteMemoryLimits;
+import org.elasticsearch.index.IndexingPressure;
 import org.elasticsearch.action.support.ActionFilters;
 import org.elasticsearch.action.support.ActionTestUtils;
 import org.elasticsearch.action.support.PlainActionFuture;
@@ -367,7 +367,7 @@ public class TransportWriteActionTests extends ESTestCase {
                     new TransportService(Settings.EMPTY, mock(Transport.class), null, TransportService.NOOP_TRANSPORT_INTERCEPTOR,
                         x -> null, null, Collections.emptySet()), TransportWriteActionTests.this.clusterService, null, null, null,
                 new ActionFilters(new HashSet<>()), TestRequest::new, TestRequest::new, ThreadPool.Names.SAME, false,
-                new WriteMemoryLimits(Settings.EMPTY));
+                new IndexingPressure(Settings.EMPTY));
             this.withDocumentFailureOnPrimary = withDocumentFailureOnPrimary;
             this.withDocumentFailureOnReplica = withDocumentFailureOnReplica;
         }
@@ -377,7 +377,7 @@ public class TransportWriteActionTests extends ESTestCase {
             super(settings, actionName, transportService, clusterService,
                     mockIndicesService(clusterService), threadPool, shardStateAction,
                     new ActionFilters(new HashSet<>()), TestRequest::new, TestRequest::new, ThreadPool.Names.SAME, false,
-                    new WriteMemoryLimits(settings));
+                    new IndexingPressure(settings));
             this.withDocumentFailureOnPrimary = false;
             this.withDocumentFailureOnReplica = false;
         }

+ 6 - 6
server/src/test/java/org/elasticsearch/cluster/DiskUsageTests.java

@@ -152,11 +152,11 @@ public class DiskUsageTests extends ESTestCase {
         };
         List<NodeStats> nodeStats = Arrays.asList(
                 new NodeStats(new DiscoveryNode("node_1", buildNewFakeTransportAddress(), emptyMap(), emptySet(), Version.CURRENT), 0,
-                        null,null,null,null,null,new FsInfo(0, null, node1FSInfo), null,null,null,null,null, null, null),
+                        null,null,null,null,null,new FsInfo(0, null, node1FSInfo), null,null,null,null,null, null, null, null),
                 new NodeStats(new DiscoveryNode("node_2", buildNewFakeTransportAddress(), emptyMap(), emptySet(), Version.CURRENT), 0,
-                        null,null,null,null,null, new FsInfo(0, null, node2FSInfo), null,null,null,null,null, null, null),
+                        null,null,null,null,null, new FsInfo(0, null, node2FSInfo), null,null,null,null,null, null, null, null),
                 new NodeStats(new DiscoveryNode("node_3", buildNewFakeTransportAddress(), emptyMap(), emptySet(), Version.CURRENT), 0,
-                        null,null,null,null,null, new FsInfo(0, null, node3FSInfo), null,null,null,null,null, null, null)
+                        null,null,null,null,null, new FsInfo(0, null, node3FSInfo), null,null,null,null,null, null, null, null)
         );
         InternalClusterInfoService.fillDiskUsagePerNode(logger, nodeStats, newLeastAvaiableUsages, newMostAvaiableUsages);
         DiskUsage leastNode_1 = newLeastAvaiableUsages.get("node_1");
@@ -193,11 +193,11 @@ public class DiskUsageTests extends ESTestCase {
         };
         List<NodeStats> nodeStats = Arrays.asList(
                 new NodeStats(new DiscoveryNode("node_1", buildNewFakeTransportAddress(), emptyMap(), emptySet(), Version.CURRENT), 0,
-                        null,null,null,null,null,new FsInfo(0, null, node1FSInfo), null,null,null,null,null, null, null),
+                        null,null,null,null,null,new FsInfo(0, null, node1FSInfo), null,null,null,null,null, null, null, null),
                 new NodeStats(new DiscoveryNode("node_2", buildNewFakeTransportAddress(), emptyMap(), emptySet(), Version.CURRENT), 0,
-                        null,null,null,null,null, new FsInfo(0, null, node2FSInfo), null,null,null,null,null, null, null),
+                        null,null,null,null,null, new FsInfo(0, null, node2FSInfo), null,null,null,null,null, null, null, null),
                 new NodeStats(new DiscoveryNode("node_3", buildNewFakeTransportAddress(), emptyMap(), emptySet(), Version.CURRENT), 0,
-                        null,null,null,null,null, new FsInfo(0, null, node3FSInfo), null,null,null,null,null, null, null)
+                        null,null,null,null,null, new FsInfo(0, null, node3FSInfo), null,null,null,null,null, null, null, null)
         );
         InternalClusterInfoService.fillDiskUsagePerNode(logger, nodeStats, newLeastAvailableUsages, newMostAvailableUsages);
         DiskUsage leastNode_1 = newLeastAvailableUsages.get("node_1");

+ 4 - 4
server/src/test/java/org/elasticsearch/index/seqno/RetentionLeaseSyncActionTests.java

@@ -20,7 +20,7 @@
 package org.elasticsearch.index.seqno;
 
 import org.elasticsearch.action.ActionListener;
-import org.elasticsearch.action.bulk.WriteMemoryLimits;
+import org.elasticsearch.index.IndexingPressure;
 import org.elasticsearch.action.support.ActionFilters;
 import org.elasticsearch.action.support.ActionTestUtils;
 import org.elasticsearch.action.support.PlainActionFuture;
@@ -106,7 +106,7 @@ public class RetentionLeaseSyncActionTests extends ESTestCase {
                 threadPool,
                 shardStateAction,
                 new ActionFilters(Collections.emptySet()),
-                new WriteMemoryLimits(Settings.EMPTY));
+                new IndexingPressure(Settings.EMPTY));
         final RetentionLeases retentionLeases = mock(RetentionLeases.class);
         final RetentionLeaseSyncAction.Request request = new RetentionLeaseSyncAction.Request(indexShard.shardId(), retentionLeases);
         action.dispatchedShardOperationOnPrimary(request, indexShard,
@@ -143,7 +143,7 @@ public class RetentionLeaseSyncActionTests extends ESTestCase {
                 threadPool,
                 shardStateAction,
                 new ActionFilters(Collections.emptySet()),
-                new WriteMemoryLimits(Settings.EMPTY));
+                new IndexingPressure(Settings.EMPTY));
         final RetentionLeases retentionLeases = mock(RetentionLeases.class);
         final RetentionLeaseSyncAction.Request request = new RetentionLeaseSyncAction.Request(indexShard.shardId(), retentionLeases);
 
@@ -182,7 +182,7 @@ public class RetentionLeaseSyncActionTests extends ESTestCase {
                 threadPool,
                 shardStateAction,
                 new ActionFilters(Collections.emptySet()),
-                new WriteMemoryLimits(Settings.EMPTY));
+                new IndexingPressure(Settings.EMPTY));
 
         assertNull(action.indexBlockLevel());
     }

+ 4 - 4
server/src/test/java/org/elasticsearch/snapshots/SnapshotResiliencyTests.java

@@ -65,7 +65,7 @@ import org.elasticsearch.action.admin.indices.mapping.put.TransportPutMappingAct
 import org.elasticsearch.action.admin.indices.shards.IndicesShardStoresAction;
 import org.elasticsearch.action.admin.indices.shards.TransportIndicesShardStoresAction;
 import org.elasticsearch.action.bulk.BulkAction;
-import org.elasticsearch.action.bulk.WriteMemoryLimits;
+import org.elasticsearch.index.IndexingPressure;
 import org.elasticsearch.action.bulk.BulkRequest;
 import org.elasticsearch.action.bulk.BulkResponse;
 import org.elasticsearch.action.bulk.TransportBulkAction;
@@ -1551,7 +1551,7 @@ public class SnapshotResiliencyTests extends ESTestCase {
                             threadPool,
                             shardStateAction,
                             actionFilters,
-                            new WriteMemoryLimits(settings))),
+                            new IndexingPressure(settings))),
                     RetentionLeaseSyncer.EMPTY,
                     client);
                 final ShardLimitValidator shardLimitValidator = new ShardLimitValidator(settings, clusterService);
@@ -1566,7 +1566,7 @@ public class SnapshotResiliencyTests extends ESTestCase {
                         actionFilters, indexNameExpressionResolver
                     ));
                 final MappingUpdatedAction mappingUpdatedAction = new MappingUpdatedAction(settings, clusterSettings, clusterService);
-                final WriteMemoryLimits indexingMemoryLimits = new WriteMemoryLimits(settings);
+                final IndexingPressure indexingMemoryLimits = new IndexingPressure(settings);
                 mappingUpdatedAction.setClient(client);
                 actions.put(BulkAction.INSTANCE,
                     new TransportBulkAction(threadPool, transportService, clusterService,
@@ -1576,7 +1576,7 @@ public class SnapshotResiliencyTests extends ESTestCase {
                             Collections.emptyList(), client),
                         client, actionFilters, indexNameExpressionResolver,
                         new AutoCreateIndex(settings, clusterSettings, indexNameExpressionResolver),
-                        new WriteMemoryLimits(settings)
+                        new IndexingPressure(settings)
                     ));
                 final TransportShardBulkAction transportShardBulkAction = new TransportShardBulkAction(settings, transportService,
                     clusterService, indicesService, threadPool, shardStateAction, mappingUpdatedAction, new UpdateHelper(scriptService),

+ 1 - 1
test/framework/src/main/java/org/elasticsearch/cluster/MockInternalClusterInfoService.java

@@ -84,7 +84,7 @@ public class MockInternalClusterInfoService extends InternalClusterInfoService {
                     .map(fsInfoPath -> diskUsageFunction.apply(discoveryNode, fsInfoPath))
                     .toArray(FsInfo.Path[]::new)), nodeStats.getTransport(),
                 nodeStats.getHttp(), nodeStats.getBreaker(), nodeStats.getScriptStats(), nodeStats.getDiscoveryStats(),
-                nodeStats.getIngestStats(), nodeStats.getAdaptiveSelectionStats());
+                nodeStats.getIngestStats(), nodeStats.getAdaptiveSelectionStats(), nodeStats.getIndexingPressureStats());
         }).collect(Collectors.toList());
     }
 

+ 5 - 5
test/framework/src/main/java/org/elasticsearch/test/InternalTestCluster.java

@@ -36,7 +36,7 @@ import org.elasticsearch.action.admin.cluster.configuration.ClearVotingConfigExc
 import org.elasticsearch.action.admin.cluster.node.stats.NodeStats;
 import org.elasticsearch.action.admin.indices.stats.CommonStatsFlags;
 import org.elasticsearch.action.admin.indices.stats.CommonStatsFlags.Flag;
-import org.elasticsearch.action.bulk.WriteMemoryLimits;
+import org.elasticsearch.index.IndexingPressure;
 import org.elasticsearch.action.support.replication.TransportReplicationAction;
 import org.elasticsearch.client.Client;
 import org.elasticsearch.cluster.ClusterName;
@@ -1165,13 +1165,13 @@ public final class InternalTestCluster extends TestCluster {
     private void assertAllPendingWriteLimitsReleased() throws Exception {
         assertBusy(() -> {
             for (NodeAndClient nodeAndClient : nodes.values()) {
-                WriteMemoryLimits writeMemoryLimits = getInstance(WriteMemoryLimits.class, nodeAndClient.name);
-                final long writeBytes = writeMemoryLimits.getWriteBytes();
+                IndexingPressure indexingPressure = getInstance(IndexingPressure.class, nodeAndClient.name);
+                final long writeBytes = indexingPressure.getCurrentCoordinatingAndPrimaryBytes();
                 if (writeBytes > 0) {
                     throw new AssertionError("pending write bytes [" + writeBytes + "] bytes on node ["
                         + nodeAndClient.name + "].");
                 }
-                final long replicaWriteBytes = writeMemoryLimits.getReplicaWriteBytes();
+                final long replicaWriteBytes = indexingPressure.getCurrentReplicaBytes();
                 if (replicaWriteBytes > 0) {
                     throw new AssertionError("pending replica write bytes [" + writeBytes + "] bytes on node ["
                         + nodeAndClient.name + "].");
@@ -2259,7 +2259,7 @@ public final class InternalTestCluster extends TestCluster {
                 NodeService nodeService = getInstanceFromNode(NodeService.class, nodeAndClient.node);
                 CommonStatsFlags flags = new CommonStatsFlags(Flag.FieldData, Flag.QueryCache, Flag.Segments);
                 NodeStats stats = nodeService.stats(flags,
-                        false, false, false, false, false, false, false, false, false, false, false, false, false);
+                        false, false, false, false, false, false, false, false, false, false, false, false, false, false);
                 assertThat("Fielddata size must be 0 on node: " + stats.getNode(),
                         stats.getIndices().getFieldData().getMemorySizeInBytes(), equalTo(0L));
                 assertThat("Query cache size must be 0 on node: " + stats.getNode(),

+ 3 - 3
x-pack/plugin/ccr/src/internalClusterTest/java/org/elasticsearch/xpack/ccr/LocalIndexFollowingIT.java

@@ -6,7 +6,7 @@
 
 package org.elasticsearch.xpack.ccr;
 
-import org.elasticsearch.action.bulk.WriteMemoryLimits;
+import org.elasticsearch.index.IndexingPressure;
 import org.elasticsearch.cluster.metadata.IndexMetadata;
 import org.elasticsearch.common.bytes.BytesArray;
 import org.elasticsearch.common.bytes.BytesReference;
@@ -117,12 +117,12 @@ public class LocalIndexFollowingIT extends CcrSingleNodeTestCase {
             final PutFollowAction.Request followRequest = getPutFollowRequest("leader", "follower");
             client().execute(PutFollowAction.INSTANCE, followRequest).get();
 
-            WriteMemoryLimits memoryLimits = getInstanceFromNode(WriteMemoryLimits.class);
+            IndexingPressure memoryLimits = getInstanceFromNode(IndexingPressure.class);
             final long finalSourceSize = sourceSize;
             assertBusy(() -> {
                 // The actual write bytes will be greater due to other request fields. However, this test is
                 // just spot checking that the bytes are incremented at all.
-                assertTrue(memoryLimits.getWriteBytes() > finalSourceSize);
+                assertTrue(memoryLimits.getCurrentCoordinatingAndPrimaryBytes() > finalSourceSize);
             });
             blocker.countDown();
             assertBusy(() -> {

+ 6 - 6
x-pack/plugin/ccr/src/main/java/org/elasticsearch/xpack/ccr/action/bulk/TransportBulkShardOperationsAction.java

@@ -9,7 +9,7 @@ package org.elasticsearch.xpack.ccr.action.bulk;
 import org.apache.logging.log4j.Logger;
 import org.elasticsearch.ExceptionsHelper;
 import org.elasticsearch.action.ActionListener;
-import org.elasticsearch.action.bulk.WriteMemoryLimits;
+import org.elasticsearch.index.IndexingPressure;
 import org.elasticsearch.action.support.ActionFilters;
 import org.elasticsearch.action.support.replication.TransportWriteAction;
 import org.elasticsearch.cluster.action.shard.ShardStateAction;
@@ -38,7 +38,7 @@ import java.util.List;
 public class TransportBulkShardOperationsAction
         extends TransportWriteAction<BulkShardOperationsRequest, BulkShardOperationsRequest, BulkShardOperationsResponse> {
 
-    private final WriteMemoryLimits writeMemoryLimits;
+    private final IndexingPressure indexingPressure;
 
     @Inject
     public TransportBulkShardOperationsAction(
@@ -49,7 +49,7 @@ public class TransportBulkShardOperationsAction
             final ThreadPool threadPool,
             final ShardStateAction shardStateAction,
             final ActionFilters actionFilters,
-            final WriteMemoryLimits writeMemoryLimits) {
+            final IndexingPressure indexingPressure) {
         super(
                 settings,
                 BulkShardOperationsAction.NAME,
@@ -61,14 +61,14 @@ public class TransportBulkShardOperationsAction
                 actionFilters,
                 BulkShardOperationsRequest::new,
                 BulkShardOperationsRequest::new,
-                ThreadPool.Names.WRITE, false, writeMemoryLimits);
-        this.writeMemoryLimits = writeMemoryLimits;
+                ThreadPool.Names.WRITE, false, indexingPressure);
+        this.indexingPressure = indexingPressure;
     }
 
     @Override
     protected void doExecute(Task task, BulkShardOperationsRequest request, ActionListener<BulkShardOperationsResponse> listener) {
         // This is executed on the follower coordinator node and we need to mark the bytes.
-        Releasable releasable = writeMemoryLimits.markWriteOperationStarted(primaryOperationSize(request));
+        Releasable releasable = indexingPressure.markIndexingOperationStarted(primaryOperationSize(request));
         ActionListener<BulkShardOperationsResponse> releasingListener = ActionListener.runBefore(listener, releasable::close);
         try {
             super.doExecute(task, request, releasingListener);

+ 1 - 1
x-pack/plugin/ml/src/test/java/org/elasticsearch/xpack/ml/MachineLearningInfoTransportActionTests.java

@@ -610,7 +610,7 @@ public class MachineLearningInfoTransportActionTests extends ESTestCase {
             IntStream.range(0, pipelineNames.size()).boxed().collect(Collectors.toMap(pipelineNames::get, processorStats::get)));
         return new NodeStats(mock(DiscoveryNode.class),
             Instant.now().toEpochMilli(), null, null, null, null, null, null, null, null,
-            null, null, null, ingestStats, null);
+            null, null, null, ingestStats, null, null);
 
     }
 

+ 1 - 1
x-pack/plugin/ml/src/test/java/org/elasticsearch/xpack/ml/action/TransportGetTrainedModelsStatsActionTests.java

@@ -298,7 +298,7 @@ public class TransportGetTrainedModelsStatsActionTests extends ESTestCase {
             IntStream.range(0, pipelineids.size()).boxed().collect(Collectors.toMap(pipelineids::get, processorStats::get)));
         return new NodeStats(mock(DiscoveryNode.class),
             Instant.now().toEpochMilli(), null, null, null, null, null, null, null, null,
-            null, null, null, ingestStats, null);
+            null, null, null, ingestStats, null, null);
 
     }
 

+ 1 - 1
x-pack/plugin/monitoring/src/test/java/org/elasticsearch/xpack/monitoring/collector/node/NodeStatsMonitoringDocTests.java

@@ -376,6 +376,6 @@ public class NodeStatsMonitoringDocTests extends BaseFilteredMonitoringDocTestCa
                                                                 emptySet(),
                                                                 Version.CURRENT);
 
-        return new NodeStats(discoveryNode, no, indices, os, process, jvm, threadPool, fs, null, null, null, null, null, null, null);
+        return new NodeStats(discoveryNode, no, indices, os, process, jvm, threadPool, fs, null, null, null, null, null, null, null, null);
     }
 }