Pārlūkot izejas kodu

Enhance memory accounting for document expansion and introduce max document size limit (#123543)

This commit improves memory accounting by incorporating document
expansion during shard bulk execution. Additionally, it introduces a new
limit on the maximum document size, which defaults to 5% of the
available heap.

This limit can be configured using the new setting:

indexing_pressure.memory.max_operation_size
These changes help prevent excessive memory consumption and
improve indexing stability.

Closes ES-10777
Francisco Fernández Castaño 7 mēneši atpakaļ
vecāks
revīzija
387eef070c
19 mainītis faili ar 594 papildinājumiem un 32 dzēšanām
  1. 6 0
      docs/changelog/123543.yaml
  2. 1 1
      modules/data-streams/src/internalClusterTest/java/org/elasticsearch/datastreams/FailureStoreMetricsWithIncrementalBulkIT.java
  3. 2 2
      server/src/internalClusterTest/java/org/elasticsearch/action/bulk/IncrementalBulkIT.java
  4. 238 6
      server/src/internalClusterTest/java/org/elasticsearch/index/IndexingPressureIT.java
  5. 4 2
      server/src/internalClusterTest/java/org/elasticsearch/index/seqno/RetentionLeaseSyncActionIT.java
  6. 1 0
      server/src/main/java/org/elasticsearch/TransportVersions.java
  7. 7 1
      server/src/main/java/org/elasticsearch/action/admin/cluster/stats/ClusterStatsNodes.java
  8. 26 0
      server/src/main/java/org/elasticsearch/action/bulk/BulkShardRequest.java
  9. 39 2
      server/src/main/java/org/elasticsearch/action/bulk/TransportShardBulkAction.java
  10. 5 0
      server/src/main/java/org/elasticsearch/action/resync/TransportResyncReplicationAction.java
  11. 24 5
      server/src/main/java/org/elasticsearch/action/support/replication/TransportWriteAction.java
  12. 1 0
      server/src/main/java/org/elasticsearch/common/settings/ClusterSettings.java
  13. 79 6
      server/src/main/java/org/elasticsearch/index/IndexingPressure.java
  14. 30 1
      server/src/main/java/org/elasticsearch/index/stats/IndexingPressureStats.java
  15. 2 0
      server/src/test/java/org/elasticsearch/action/admin/cluster/node/stats/NodeStatsTests.java
  16. 7 3
      server/src/test/java/org/elasticsearch/action/admin/cluster/stats/ClusterStatsNodesTests.java
  17. 115 2
      server/src/test/java/org/elasticsearch/index/IndexingPressureTests.java
  18. 5 0
      x-pack/plugin/ccr/src/main/java/org/elasticsearch/xpack/ccr/action/bulk/TransportBulkShardOperationsAction.java
  19. 2 1
      x-pack/plugin/monitoring/src/test/java/org/elasticsearch/xpack/monitoring/collector/cluster/ClusterStatsMonitoringDocTests.java

+ 6 - 0
docs/changelog/123543.yaml

@@ -0,0 +1,6 @@
+pr: 123543
+summary: Enhance memory accounting for document expansion and introduce max document
+  size limit
+area: CRUD
+type: enhancement
+issues: []

+ 1 - 1
modules/data-streams/src/internalClusterTest/java/org/elasticsearch/datastreams/FailureStoreMetricsWithIncrementalBulkIT.java

@@ -112,7 +112,7 @@ public class FailureStoreMetricsWithIncrementalBulkIT extends ESIntegTestCase {
             IndexingPressure primaryPressure = internalCluster().getInstance(IndexingPressure.class, node);
             long memoryLimit = primaryPressure.stats().getMemoryLimit();
             long primaryRejections = primaryPressure.stats().getPrimaryRejections();
-            try (Releasable ignored = primaryPressure.markPrimaryOperationStarted(10, memoryLimit, false)) {
+            try (Releasable ignored = primaryPressure.validateAndMarkPrimaryOperationStarted(10, memoryLimit, 0, false, false)) {
                 while (primaryPressure.stats().getPrimaryRejections() == primaryRejections) {
                     while (nextRequested.get()) {
                         nextRequested.set(false);

+ 2 - 2
server/src/internalClusterTest/java/org/elasticsearch/action/bulk/IncrementalBulkIT.java

@@ -399,7 +399,7 @@ public class IncrementalBulkIT extends ESIntegTestCase {
         IndexingPressure primaryPressure = internalCluster().getInstance(IndexingPressure.class, node);
         long memoryLimit = primaryPressure.stats().getMemoryLimit();
         long primaryRejections = primaryPressure.stats().getPrimaryRejections();
-        try (Releasable releasable = primaryPressure.markPrimaryOperationStarted(10, memoryLimit, false)) {
+        try (Releasable releasable = primaryPressure.validateAndMarkPrimaryOperationStarted(10, memoryLimit, 0, false, false)) {
             while (primaryPressure.stats().getPrimaryRejections() == primaryRejections) {
                 while (nextRequested.get()) {
                     nextRequested.set(false);
@@ -497,7 +497,7 @@ public class IncrementalBulkIT extends ESIntegTestCase {
         assertThat(node, equalTo(dataOnlyNode));
         IndexingPressure primaryPressure = internalCluster().getInstance(IndexingPressure.class, node);
         long memoryLimit = primaryPressure.stats().getMemoryLimit();
-        try (Releasable releasable = primaryPressure.markPrimaryOperationStarted(10, memoryLimit, false)) {
+        try (Releasable releasable = primaryPressure.validateAndMarkPrimaryOperationStarted(10, memoryLimit, 0, false, false)) {
             while (nextRequested.get()) {
                 nextRequested.set(false);
                 refCounted.incRef();

+ 238 - 6
server/src/internalClusterTest/java/org/elasticsearch/index/IndexingPressureIT.java

@@ -12,17 +12,24 @@ import org.elasticsearch.action.ActionFuture;
 import org.elasticsearch.action.DocWriteResponse;
 import org.elasticsearch.action.admin.indices.stats.IndicesStatsResponse;
 import org.elasticsearch.action.admin.indices.stats.ShardStats;
+import org.elasticsearch.action.bulk.BulkItemResponse;
 import org.elasticsearch.action.bulk.BulkRequest;
 import org.elasticsearch.action.bulk.BulkResponse;
 import org.elasticsearch.action.bulk.TransportShardBulkAction;
 import org.elasticsearch.action.index.IndexRequest;
+import org.elasticsearch.action.support.replication.TransportReplicationAction;
+import org.elasticsearch.client.internal.Requests;
 import org.elasticsearch.cluster.node.DiscoveryNodes;
 import org.elasticsearch.cluster.routing.ShardRouting;
+import org.elasticsearch.common.CheckedBiConsumer;
 import org.elasticsearch.common.UUIDs;
 import org.elasticsearch.common.settings.Settings;
 import org.elasticsearch.common.util.concurrent.EsRejectedExecutionException;
 import org.elasticsearch.core.Releasable;
 import org.elasticsearch.core.Tuple;
+import org.elasticsearch.index.engine.Engine;
+import org.elasticsearch.index.shard.IndexingOperationListener;
+import org.elasticsearch.index.shard.ShardId;
 import org.elasticsearch.plugins.Plugin;
 import org.elasticsearch.test.ESIntegTestCase;
 import org.elasticsearch.test.InternalSettingsPlugin;
@@ -36,6 +43,7 @@ import java.util.Arrays;
 import java.util.Collection;
 import java.util.Collections;
 import java.util.concurrent.CountDownLatch;
+import java.util.concurrent.atomic.AtomicReference;
 import java.util.stream.Stream;
 
 import static org.elasticsearch.test.hamcrest.ElasticsearchAssertions.assertAcked;
@@ -43,6 +51,8 @@ import static org.hamcrest.Matchers.equalTo;
 import static org.hamcrest.Matchers.greaterThan;
 import static org.hamcrest.Matchers.greaterThanOrEqualTo;
 import static org.hamcrest.Matchers.instanceOf;
+import static org.hamcrest.Matchers.is;
+import static org.hamcrest.Matchers.lessThan;
 
 @ESIntegTestCase.ClusterScope(scope = ESIntegTestCase.Scope.TEST, numDataNodes = 2, numClientNodes = 1)
 public class IndexingPressureIT extends ESIntegTestCase {
@@ -58,7 +68,7 @@ public class IndexingPressureIT extends ESIntegTestCase {
 
     @Override
     protected Collection<Class<? extends Plugin>> nodePlugins() {
-        return Arrays.asList(MockTransportService.TestPlugin.class, InternalSettingsPlugin.class);
+        return Arrays.asList(MockTransportService.TestPlugin.class, InternalSettingsPlugin.class, PreIndexListenerInstallerPlugin.class);
     }
 
     @Override
@@ -101,7 +111,7 @@ public class IndexingPressureIT extends ESIntegTestCase {
         });
 
         final ThreadPool replicaThreadPool = replicaTransportService.getThreadPool();
-        final Releasable replicaRelease = blockReplicas(replicaThreadPool);
+        final Releasable replicaRelease = blockWriteThreadPool(replicaThreadPool);
 
         final BulkRequest bulkRequest = new BulkRequest();
         long totalRequestSize = 0;
@@ -260,7 +270,7 @@ public class IndexingPressureIT extends ESIntegTestCase {
         String coordinatingOnlyNode = getCoordinatingOnlyNode();
 
         final ThreadPool replicaThreadPool = internalCluster().getInstance(ThreadPool.class, replicaName);
-        try (Releasable replicaRelease = blockReplicas(replicaThreadPool)) {
+        try (Releasable replicaRelease = blockWriteThreadPool(replicaThreadPool)) {
             final ActionFuture<BulkResponse> successFuture = client(coordinatingOnlyNode).bulk(bulkRequest);
 
             IndexingPressure primaryWriteLimits = internalCluster().getInstance(IndexingPressure.class, primaryName);
@@ -324,7 +334,7 @@ public class IndexingPressureIT extends ESIntegTestCase {
         String coordinatingOnlyNode = getCoordinatingOnlyNode();
 
         final ThreadPool replicaThreadPool = internalCluster().getInstance(ThreadPool.class, replicaName);
-        try (Releasable replicaRelease = blockReplicas(replicaThreadPool)) {
+        try (Releasable replicaRelease = blockWriteThreadPool(replicaThreadPool)) {
             final ActionFuture<BulkResponse> successFuture = client(primaryName).bulk(bulkRequest);
 
             IndexingPressure primaryWriteLimits = internalCluster().getInstance(IndexingPressure.class, primaryName);
@@ -372,7 +382,7 @@ public class IndexingPressureIT extends ESIntegTestCase {
         String coordinatingOnlyNode = getCoordinatingOnlyNode();
 
         final ThreadPool replicaThreadPool = internalCluster().getInstance(ThreadPool.class, replicaName);
-        try (Releasable replicaRelease = blockReplicas(replicaThreadPool)) {
+        try (Releasable replicaRelease = blockWriteThreadPool(replicaThreadPool)) {
             // The write limits is set to 1MB. We will send up to 800KB to stay below that threshold.
             int thresholdToStopSending = 800 * 1024;
 
@@ -392,6 +402,189 @@ public class IndexingPressureIT extends ESIntegTestCase {
         }
     }
 
+    public void testWriteCanRejectOnPrimaryBasedOnMaxOperationSize() throws Exception {
+        final BulkRequest bulkRequest = new BulkRequest();
+        long firstInFlightRequestSizeInBytes = 0;
+        long firstInFlightRequestLargestDocumentSize = 0;
+        int numberOfIndexRequests = randomIntBetween(50, 100);
+        for (int i = 0; i < numberOfIndexRequests; i++) {
+            IndexRequest request = new IndexRequest(INDEX_NAME).id(UUIDs.base64UUID())
+                .source(Collections.singletonMap("key", randomAlphaOfLength(randomIntBetween(50, 100))));
+            firstInFlightRequestSizeInBytes += request.ramBytesUsed();
+            firstInFlightRequestLargestDocumentSize = Math.max(firstInFlightRequestLargestDocumentSize, request.source().length());
+            assertTrue(request.ramBytesUsed() > request.source().length());
+            bulkRequest.add(request);
+        }
+
+        long maxPrimaryBytes = (long) (firstInFlightRequestSizeInBytes * 1.5);
+        restartNodesWithSettings(Settings.builder().put(IndexingPressure.MAX_PRIMARY_BYTES.getKey(), maxPrimaryBytes + "B").build());
+
+        assertAcked(prepareCreate(INDEX_NAME, indexSettings(1, 1)));
+        ensureGreen(INDEX_NAME);
+
+        Tuple<String, String> primaryReplicaNodeNames = getPrimaryReplicaNodeNames();
+        String primaryName = primaryReplicaNodeNames.v1();
+        String replicaName = primaryReplicaNodeNames.v2();
+        String coordinatingOnlyNode = getCoordinatingOnlyNode();
+
+        var primaryIndexOperationBlockedLatch = new CountDownLatch(1);
+        var primaryIndexOperationDispatchedLatch = new CountDownLatch(1);
+        PreIndexListenerInstallerPlugin.installPreIndexListener(((shardId, index) -> {
+            if (index.origin().equals(Engine.Operation.Origin.PRIMARY)) {
+                primaryIndexOperationDispatchedLatch.countDown();
+                safeAwait(primaryIndexOperationBlockedLatch);
+            }
+        }));
+
+        final ThreadPool replicaThreadPool = internalCluster().getInstance(ThreadPool.class, replicaName);
+        final ThreadPool primaryThreadPool = internalCluster().getInstance(ThreadPool.class, replicaName);
+        try (
+            Releasable blockedPrimaryWriteThreadsRelease = blockWriteThreadPool(primaryThreadPool);
+            Releasable blockedReplicaWriteThreadsRelease = blockWriteThreadPool(replicaThreadPool)
+        ) {
+            final ActionFuture<BulkResponse> successFuture = client(primaryName).bulk(bulkRequest);
+
+            IndexingPressure primaryWriteLimits = internalCluster().getInstance(IndexingPressure.class, primaryName);
+            IndexingPressure replicaWriteLimits = internalCluster().getInstance(IndexingPressure.class, replicaName);
+            IndexingPressure coordinatingWriteLimits = internalCluster().getInstance(IndexingPressure.class, coordinatingOnlyNode);
+
+            long inFlightPrimaryBytesBeforeDispatch = primaryWriteLimits.stats().getCurrentCombinedCoordinatingAndPrimaryBytes();
+            assertThat(inFlightPrimaryBytesBeforeDispatch, greaterThan(firstInFlightRequestSizeInBytes));
+
+            blockedPrimaryWriteThreadsRelease.close();
+            safeAwait(primaryIndexOperationDispatchedLatch);
+            assertThat(
+                primaryWriteLimits.stats().getCurrentCombinedCoordinatingAndPrimaryBytes(),
+                equalTo(inFlightPrimaryBytesBeforeDispatch + firstInFlightRequestLargestDocumentSize * 4)
+            );
+            primaryIndexOperationBlockedLatch.countDown();
+            PreIndexListenerInstallerPlugin.resetPreIndexListener();
+
+            var roomBeforePrimaryRejectionsInBytes = maxPrimaryBytes - primaryWriteLimits.stats()
+                .getCurrentCombinedCoordinatingAndPrimaryBytes();
+            var bulkRequestWithLargeExpandedOperation = new BulkRequest();
+            var indexRequest = new IndexRequest(INDEX_NAME).id(UUIDs.base64UUID())
+                .source(Collections.singletonMap("key", randomAlphaOfLength((int) roomBeforePrimaryRejectionsInBytes / 2)));
+            // We need to ensure that the request would be able to be dispatched but once the document is expanded it will be rejected
+            assertThat(indexRequest.ramBytesUsed(), is(lessThan(roomBeforePrimaryRejectionsInBytes)));
+            bulkRequestWithLargeExpandedOperation.add(indexRequest);
+
+            BulkResponse responses = client(coordinatingOnlyNode).bulk(bulkRequestWithLargeExpandedOperation).actionGet();
+            assertTrue(responses.hasFailures());
+            BulkItemResponse.Failure failure = responses.getItems()[0].getFailure();
+
+            // The indexing memory pressure failing for this request is happening once the primary operation is dispatched into the write
+            // thread pool, hence the triple exception nesting:
+            // 1. TransportBulkAction (coordinator)
+            // 2. TransportShardBulkAction (coordinator)
+            // 3. TransportShardBulkAction (primary)
+            // 4. TransportShardBulkAction[p] (primary)
+            assertThat(failure.getCause().getCause().getCause(), instanceOf(EsRejectedExecutionException.class));
+
+            blockedReplicaWriteThreadsRelease.close();
+
+            successFuture.actionGet();
+
+            assertEquals(0, primaryWriteLimits.stats().getCurrentCombinedCoordinatingAndPrimaryBytes());
+            assertEquals(0, primaryWriteLimits.stats().getCurrentReplicaBytes());
+            assertEquals(0, replicaWriteLimits.stats().getCurrentCombinedCoordinatingAndPrimaryBytes());
+            assertEquals(0, replicaWriteLimits.stats().getCurrentReplicaBytes());
+            assertEquals(0, coordinatingWriteLimits.stats().getCurrentCombinedCoordinatingAndPrimaryBytes());
+            assertEquals(0, coordinatingWriteLimits.stats().getCurrentReplicaBytes());
+        }
+    }
+
+    public void testWriteCanRejectOnReplicaBasedOnMaxDocumentSize() throws Exception {
+        final BulkRequest bulkRequest = new BulkRequest();
+        long totalRequestSize = 0;
+        int numberOfIndexRequests = randomIntBetween(50, 100);
+        for (int i = 0; i < numberOfIndexRequests; ++i) {
+            IndexRequest request = new IndexRequest(INDEX_NAME).id(UUIDs.base64UUID()).source(Requests.INDEX_CONTENT_TYPE);
+            totalRequestSize += request.ramBytesUsed();
+            bulkRequest.add(request);
+        }
+
+        // The request meets the primary limits, but the replica limits are set lower, preventing it from proceeding
+        IndexRequest largeIndexRequest = new IndexRequest(INDEX_NAME).id(UUIDs.base64UUID())
+            .source(Collections.singletonMap("key", randomAlphaOfLength((int) totalRequestSize)));
+        bulkRequest.add(largeIndexRequest);
+        totalRequestSize += largeIndexRequest.ramBytesUsed();
+
+        final long bulkShardRequestSize = totalRequestSize;
+        restartNodesWithSettings(
+            Settings.builder()
+                .put(IndexingPressure.MAX_PRIMARY_BYTES.getKey(), bulkShardRequestSize * 5 + "B")
+                .put(IndexingPressure.MAX_REPLICA_BYTES.getKey(), (long) (bulkShardRequestSize * 1.5) + "B")
+                // Ensure that the replica request fails straight away after the first rejection
+                .put(TransportReplicationAction.REPLICATION_RETRY_TIMEOUT.getKey(), 0)
+                .build()
+        );
+
+        assertAcked(prepareCreate(INDEX_NAME, indexSettings(1, 1)));
+        ensureGreen(INDEX_NAME);
+
+        Tuple<String, String> primaryReplicaNodeNames = getPrimaryReplicaNodeNames();
+        String primaryName = primaryReplicaNodeNames.v1();
+        String replicaName = primaryReplicaNodeNames.v2();
+        String coordinatingOnlyNode = getCoordinatingOnlyNode();
+
+        final ActionFuture<BulkResponse> successFuture = client(primaryName).bulk(bulkRequest);
+
+        IndexingPressure primaryWriteLimits = internalCluster().getInstance(IndexingPressure.class, primaryName);
+        IndexingPressure replicaWriteLimits = internalCluster().getInstance(IndexingPressure.class, replicaName);
+        IndexingPressure coordinatingWriteLimits = internalCluster().getInstance(IndexingPressure.class, coordinatingOnlyNode);
+
+        BulkResponse responses = successFuture.actionGet();
+        assertFalse(responses.hasFailures());
+
+        assertEquals(0, primaryWriteLimits.stats().getCurrentCombinedCoordinatingAndPrimaryBytes());
+        assertEquals(0, primaryWriteLimits.stats().getCurrentReplicaBytes());
+        assertEquals(0, primaryWriteLimits.stats().getPrimaryRejections());
+
+        assertEquals(0, replicaWriteLimits.stats().getCurrentCombinedCoordinatingAndPrimaryBytes());
+        assertEquals(0, replicaWriteLimits.stats().getCurrentReplicaBytes());
+        assertEquals(1L, replicaWriteLimits.stats().getReplicaRejections());
+
+        assertEquals(0, coordinatingWriteLimits.stats().getCurrentCombinedCoordinatingAndPrimaryBytes());
+        assertEquals(0, coordinatingWriteLimits.stats().getCurrentReplicaBytes());
+    }
+
+    public void testDocumentsBeyondMaxSizeAreRejected() throws Exception {
+        restartNodesWithSettings(Settings.builder().put(IndexingPressure.MAX_OPERATION_SIZE.getKey(), "10B").build());
+
+        assertAcked(prepareCreate(INDEX_NAME, indexSettings(1, 1)));
+        ensureGreen(INDEX_NAME);
+
+        Tuple<String, String> primaryReplicaNodeNames = getPrimaryReplicaNodeNames();
+        String primaryName = primaryReplicaNodeNames.v1();
+        String coordinatingOnlyNode = getCoordinatingOnlyNode();
+
+        final BulkRequest bulkRequest = new BulkRequest();
+        int numberOfIndexRequests = randomIntBetween(50, 100);
+        for (int i = 0; i < numberOfIndexRequests; ++i) {
+            IndexRequest request = new IndexRequest(INDEX_NAME).id(UUIDs.base64UUID())
+                .source(Collections.singletonMap("key", randomAlphaOfLength(50)));
+            bulkRequest.add(request);
+        }
+        final ActionFuture<BulkResponse> successFuture = client(randomBoolean() ? primaryName : coordinatingOnlyNode).bulk(bulkRequest);
+
+        IndexingPressure primaryWriteLimits = internalCluster().getInstance(IndexingPressure.class, primaryName);
+
+        BulkResponse responses = successFuture.actionGet();
+        assertTrue(responses.hasFailures());
+
+        assertEquals(0, primaryWriteLimits.stats().getCurrentCombinedCoordinatingAndPrimaryBytes());
+        assertEquals(0, primaryWriteLimits.stats().getCurrentPrimaryOps());
+        assertEquals(0, primaryWriteLimits.stats().getCurrentPrimaryBytes());
+
+        assertEquals(0, primaryWriteLimits.stats().getTotalPrimaryOps());
+        assertEquals(0, primaryWriteLimits.stats().getTotalPrimaryBytes());
+
+        assertEquals(1L, primaryWriteLimits.stats().getPrimaryRejections());
+        assertEquals(1L, primaryWriteLimits.stats().getLargeOpsRejections());
+        assertThat(primaryWriteLimits.stats().getTotalLargeRejectedOpsBytes(), is(greaterThanOrEqualTo(50L)));
+    }
+
     private void restartNodesWithSettings(Settings settings) throws Exception {
         internalCluster().fullRestart(new InternalTestCluster.RestartCallback() {
             @Override
@@ -433,7 +626,7 @@ public class IndexingPressureIT extends ESIntegTestCase {
         return new Tuple<>(primaryName, replicaName);
     }
 
-    private Releasable blockReplicas(ThreadPool threadPool) {
+    private Releasable blockWriteThreadPool(ThreadPool threadPool) {
         final CountDownLatch blockReplication = new CountDownLatch(1);
         final int threads = threadPool.info(ThreadPool.Names.WRITE).getMax();
         final CountDownLatch pointReached = new CountDownLatch(threads);
@@ -454,4 +647,43 @@ public class IndexingPressureIT extends ESIntegTestCase {
             }
         };
     }
+
+    public static class PreIndexListenerInstallerPlugin extends Plugin {
+        public static AtomicReference<CheckedBiConsumer<ShardId, Engine.Index, Exception>> PRE_INDEX_CHECK_REF = new AtomicReference<>(
+            ((shardId, index) -> {})
+        );
+
+        public PreIndexListenerInstallerPlugin() {}
+
+        @Override
+        public void onIndexModule(IndexModule indexModule) {
+            indexModule.addIndexOperationListener(new InjectablePreIndexOperationListener(PRE_INDEX_CHECK_REF));
+        }
+
+        public static void installPreIndexListener(CheckedBiConsumer<ShardId, Engine.Index, Exception> preIndexCheck) {
+            PRE_INDEX_CHECK_REF.set(preIndexCheck);
+        }
+
+        public static void resetPreIndexListener() {
+            PRE_INDEX_CHECK_REF.set((shardId, index) -> {});
+        }
+    }
+
+    static class InjectablePreIndexOperationListener implements IndexingOperationListener {
+        private final AtomicReference<CheckedBiConsumer<ShardId, Engine.Index, Exception>> preIndexCheckRef;
+
+        InjectablePreIndexOperationListener(AtomicReference<CheckedBiConsumer<ShardId, Engine.Index, Exception>> preIndexCheckRef) {
+            this.preIndexCheckRef = preIndexCheckRef;
+        }
+
+        @Override
+        public Engine.Index preIndex(ShardId shardId, Engine.Index index) {
+            try {
+                preIndexCheckRef.get().accept(shardId, index);
+            } catch (Exception e) {
+                throw new AssertionError("unexpected error", e);
+            }
+            return index;
+        }
+    }
 }

+ 4 - 2
server/src/internalClusterTest/java/org/elasticsearch/index/seqno/RetentionLeaseSyncActionIT.java

@@ -90,10 +90,12 @@ public class RetentionLeaseSyncActionIT extends ESIntegTestCase {
      */
     private static Releasable fullyAllocatePrimaryIndexingCapacityOnNode(String targetNode) {
         return internalCluster().getInstance(IndexingPressure.class, targetNode)
-            .markPrimaryOperationStarted(
+            .validateAndMarkPrimaryOperationStarted(
                 1,
                 IndexingPressure.MAX_PRIMARY_BYTES.get(internalCluster().getInstance(Settings.class, targetNode)).getBytes() + 1,
-                true
+                0,
+                true,
+                false
             );
     }
 }

+ 1 - 0
server/src/main/java/org/elasticsearch/TransportVersions.java

@@ -185,6 +185,7 @@ public class TransportVersions {
     public static final TransportVersion RE_REMOVE_MIN_COMPATIBLE_SHARD_NODE = def(9_021_0_00);
     public static final TransportVersion UNASSIGENEDINFO_RESHARD_ADDED = def(9_022_0_00);
     public static final TransportVersion INCLUDE_INDEX_MODE_IN_GET_DATA_STREAM = def(9_023_0_00);
+    public static final TransportVersion MAX_OPERATION_SIZE_REJECTIONS_ADDED = def(9_024_0_00);
 
     /*
      * STOP! READ THIS FIRST! No, really,

+ 7 - 1
server/src/main/java/org/elasticsearch/action/admin/cluster/stats/ClusterStatsNodes.java

@@ -795,6 +795,8 @@ public class ClusterStatsNodes implements ToXContentFragment {
             long currentReplicaOps = 0;
             long lowWaterMarkSplits = 0;
             long highWaterMarkSplits = 0;
+            long largeOpsRejections = 0;
+            long totalLargeRejectedOpsBytes = 0;
             for (NodeStats nodeStat : nodeStats) {
                 IndexingPressureStats nodeStatIndexingPressureStats = nodeStat.getIndexingPressureStats();
                 if (nodeStatIndexingPressureStats != null) {
@@ -820,6 +822,8 @@ public class ClusterStatsNodes implements ToXContentFragment {
                     totalCoordinatingRequests += nodeStatIndexingPressureStats.getTotalCoordinatingRequests();
                     lowWaterMarkSplits += nodeStatIndexingPressureStats.getLowWaterMarkSplits();
                     highWaterMarkSplits += nodeStatIndexingPressureStats.getHighWaterMarkSplits();
+                    largeOpsRejections += nodeStatIndexingPressureStats.getLargeOpsRejections();
+                    totalLargeRejectedOpsBytes += nodeStatIndexingPressureStats.getTotalLargeRejectedOpsBytes();
                 }
             }
             indexingPressureStats = new IndexingPressureStats(
@@ -844,7 +848,9 @@ public class ClusterStatsNodes implements ToXContentFragment {
                 primaryDocumentRejections,
                 totalCoordinatingRequests,
                 lowWaterMarkSplits,
-                highWaterMarkSplits
+                highWaterMarkSplits,
+                largeOpsRejections,
+                totalLargeRejectedOpsBytes
             );
         }
 

+ 26 - 0
server/src/main/java/org/elasticsearch/action/bulk/BulkShardRequest.java

@@ -103,6 +103,24 @@ public final class BulkShardRequest extends ReplicatedWriteRequest<BulkShardRequ
         return totalSizeInBytes;
     }
 
+    public long maxOperationSizeInBytes() {
+        long maxOperationSizeInBytes = 0;
+        for (int i = 0; i < items.length; i++) {
+            DocWriteRequest<?> request = items[i].request();
+            if (request instanceof IndexRequest) {
+                if (((IndexRequest) request).source() != null) {
+                    maxOperationSizeInBytes = Math.max(maxOperationSizeInBytes, ((IndexRequest) request).source().length());
+                }
+            } else if (request instanceof UpdateRequest) {
+                IndexRequest doc = ((UpdateRequest) request).doc();
+                if (doc != null && doc.source() != null) {
+                    maxOperationSizeInBytes = Math.max(maxOperationSizeInBytes, ((UpdateRequest) request).doc().source().length());
+                }
+            }
+        }
+        return maxOperationSizeInBytes;
+    }
+
     public BulkItemRequest[] items() {
         return items;
     }
@@ -199,6 +217,14 @@ public final class BulkShardRequest extends ReplicatedWriteRequest<BulkShardRequ
         return sum;
     }
 
+    public long largestOperationSize() {
+        long maxOperationSize = 0;
+        for (BulkItemRequest item : items) {
+            maxOperationSize = Math.max(maxOperationSize, item.ramBytesUsed());
+        }
+        return maxOperationSize;
+    }
+
     public boolean isSimulated() {
         return isSimulated;
     }

+ 39 - 2
server/src/main/java/org/elasticsearch/action/bulk/TransportShardBulkAction.java

@@ -87,6 +87,13 @@ public class TransportShardBulkAction extends TransportWriteAction<BulkShardRequ
 
     private static final Logger logger = LogManager.getLogger(TransportShardBulkAction.class);
 
+    // Represents the maximum memory overhead factor for an operation when processed for indexing.
+    // This accounts for potential increases in memory usage due to document expansion, including:
+    // 1. If the document source is not stored in a contiguous byte array, it will be copied to ensure contiguity.
+    // 2. If the document contains strings, Jackson uses char arrays (2 bytes per character) to parse string fields, doubling memory usage.
+    // 3. Parsed string fields create new copies of their data, further increasing memory consumption.
+    private static final int MAX_EXPANDED_OPERATION_MEMORY_OVERHEAD_FACTOR = 4;
+
     private final UpdateHelper updateHelper;
     private final MappingUpdatedAction mappingUpdatedAction;
     private final Consumer<Runnable> postWriteAction;
@@ -161,8 +168,16 @@ public class TransportShardBulkAction extends TransportWriteAction<BulkShardRequ
     protected void dispatchedShardOperationOnPrimary(
         BulkShardRequest request,
         IndexShard primary,
-        ActionListener<PrimaryResult<BulkShardRequest, BulkShardResponse>> listener
+        ActionListener<PrimaryResult<BulkShardRequest, BulkShardResponse>> outerListener
     ) {
+        var listener = ActionListener.releaseBefore(
+            indexingPressure.trackPrimaryOperationExpansion(
+                primaryOperationCount(request),
+                getMaxOperationMemoryOverhead(request),
+                force(request)
+            ),
+            outerListener
+        );
         ClusterStateObserver observer = new ClusterStateObserver(clusterService, request.timeout(), logger, threadPool.getThreadContext());
         performOnPrimary(request, primary, updateHelper, threadPool::absoluteTimeInMillis, (update, shardId, mappingListener) -> {
             assert update != null;
@@ -200,6 +215,16 @@ public class TransportShardBulkAction extends TransportWriteAction<BulkShardRequ
         return request.items().length;
     }
 
+    @Override
+    protected long primaryLargestOperationSize(BulkShardRequest request) {
+        return request.largestOperationSize();
+    }
+
+    @Override
+    protected boolean primaryAllowsOperationsBeyondSizeLimit(BulkShardRequest request) {
+        return false;
+    }
+
     public static void performOnPrimary(
         BulkShardRequest request,
         IndexShard primary,
@@ -638,7 +663,15 @@ public class TransportShardBulkAction extends TransportWriteAction<BulkShardRequ
     }
 
     @Override
-    protected void dispatchedShardOperationOnReplica(BulkShardRequest request, IndexShard replica, ActionListener<ReplicaResult> listener) {
+    protected void dispatchedShardOperationOnReplica(
+        BulkShardRequest request,
+        IndexShard replica,
+        ActionListener<ReplicaResult> outerListener
+    ) {
+        var listener = ActionListener.releaseBefore(
+            indexingPressure.trackReplicaOperationExpansion(getMaxOperationMemoryOverhead(request), force(request)),
+            outerListener
+        );
         ActionListener.completeWith(listener, () -> {
             final long startBulkTime = System.nanoTime();
             final Translog.Location location = performOnReplica(request, replica);
@@ -647,6 +680,10 @@ public class TransportShardBulkAction extends TransportWriteAction<BulkShardRequ
         });
     }
 
+    private static long getMaxOperationMemoryOverhead(BulkShardRequest request) {
+        return request.maxOperationSizeInBytes() * MAX_EXPANDED_OPERATION_MEMORY_OVERHEAD_FACTOR;
+    }
+
     @Override
     protected long replicaOperationSize(BulkShardRequest request) {
         return request.ramBytesUsed();

+ 5 - 0
server/src/main/java/org/elasticsearch/action/resync/TransportResyncReplicationAction.java

@@ -138,6 +138,11 @@ public class TransportResyncReplicationAction extends TransportWriteAction<
         return request.getOperations().length;
     }
 
+    @Override
+    protected long primaryLargestOperationSize(ResyncReplicationRequest request) {
+        return Stream.of(request.getOperations()).mapToLong(Translog.Operation::estimateSize).max().orElse(0);
+    }
+
     public static ResyncReplicationRequest performOnPrimary(ResyncReplicationRequest request) {
         return request;
     }

+ 24 - 5
server/src/main/java/org/elasticsearch/action/support/replication/TransportWriteAction.java

@@ -117,7 +117,13 @@ public abstract class TransportWriteAction<
 
     @Override
     protected Releasable checkOperationLimits(Request request) {
-        return indexingPressure.markPrimaryOperationStarted(primaryOperationCount(request), primaryOperationSize(request), force(request));
+        return indexingPressure.validateAndMarkPrimaryOperationStarted(
+            primaryOperationCount(request),
+            primaryOperationSize(request),
+            primaryLargestOperationSize(request),
+            force(request),
+            primaryAllowsOperationsBeyondSizeLimit(request)
+        );
     }
 
     protected boolean force(ReplicatedWriteRequest<?> request) {
@@ -137,9 +143,11 @@ public abstract class TransportWriteAction<
             // If this primary request was received from a local reroute initiated by the node client, we
             // must mark a new primary operation local to the coordinating node.
             if (localRerouteInitiatedByNodeClient) {
-                return indexingPressure.markPrimaryOperationLocalToCoordinatingNodeStarted(
+                return indexingPressure.validateAndMarkPrimaryOperationLocalToCoordinatingNodeStarted(
                     primaryOperationCount(request),
-                    primaryOperationSize(request)
+                    primaryOperationSize(request),
+                    primaryLargestOperationSize(request),
+                    primaryAllowsOperationsBeyondSizeLimit(request)
                 );
             } else {
                 return () -> {};
@@ -148,11 +156,14 @@ public abstract class TransportWriteAction<
             // If this primary request was received directly from the network, we must mark a new primary
             // operation. This happens if the write action skips the reroute step (ex: rsync) or during
             // primary delegation, after the primary relocation hand-off.
-            return indexingPressure.markPrimaryOperationStarted(
+            return indexingPressure.validateAndMarkPrimaryOperationStarted(
                 primaryOperationCount(request),
                 primaryOperationSize(request),
-                force(request)
+                primaryLargestOperationSize(request),
+                force(request),
+                primaryAllowsOperationsBeyondSizeLimit(request)
             );
+
         }
     }
 
@@ -164,6 +175,14 @@ public abstract class TransportWriteAction<
         return 0;
     }
 
+    protected long primaryLargestOperationSize(Request request) {
+        return 0;
+    }
+
+    protected boolean primaryAllowsOperationsBeyondSizeLimit(Request request) {
+        return true;
+    }
+
     @Override
     protected Releasable checkReplicaLimits(ReplicaRequest request) {
         return indexingPressure.markReplicaOperationStarted(replicaOperationCount(request), replicaOperationSize(request), force(request));

+ 1 - 0
server/src/main/java/org/elasticsearch/common/settings/ClusterSettings.java

@@ -577,6 +577,7 @@ public final class ClusterSettings extends AbstractScopedSettings {
         FsHealthService.SLOW_PATH_LOGGING_THRESHOLD_SETTING,
         IndexingPressure.MAX_INDEXING_BYTES,
         IndexingPressure.MAX_COORDINATING_BYTES,
+        IndexingPressure.MAX_OPERATION_SIZE,
         IndexingPressure.MAX_PRIMARY_BYTES,
         IndexingPressure.MAX_REPLICA_BYTES,
         IndexingPressure.SPLIT_BULK_THRESHOLD,

+ 79 - 6
server/src/main/java/org/elasticsearch/index/IndexingPressure.java

@@ -80,6 +80,12 @@ public class IndexingPressure {
         Setting.Property.NodeScope
     );
 
+    public static final Setting<ByteSizeValue> MAX_OPERATION_SIZE = Setting.memorySizeSetting(
+        "indexing_pressure.memory.max_operation_size",
+        "10%",
+        Setting.Property.NodeScope
+    );
+
     private static final Logger logger = LogManager.getLogger(IndexingPressure.class);
 
     private final AtomicLong currentCombinedCoordinatingAndPrimaryBytes = new AtomicLong(0);
@@ -109,6 +115,9 @@ public class IndexingPressure {
     private final AtomicLong lowWaterMarkSplits = new AtomicLong(0);
     private final AtomicLong highWaterMarkSplits = new AtomicLong(0);
 
+    private final AtomicLong largeOpsRejections = new AtomicLong(0);
+    private final AtomicLong totalRejectedLargeOpsBytes = new AtomicLong(0);
+
     private final long lowWatermark;
     private final long lowWatermarkSize;
     private final long highWatermark;
@@ -116,6 +125,7 @@ public class IndexingPressure {
     private final long coordinatingLimit;
     private final long primaryLimit;
     private final long replicaLimit;
+    private final long operationLimit;
 
     public IndexingPressure(Settings settings) {
         this.lowWatermark = SPLIT_BULK_LOW_WATERMARK.get(settings).getBytes();
@@ -125,6 +135,7 @@ public class IndexingPressure {
         this.coordinatingLimit = MAX_COORDINATING_BYTES.get(settings).getBytes();
         this.primaryLimit = MAX_PRIMARY_BYTES.get(settings).getBytes();
         this.replicaLimit = MAX_REPLICA_BYTES.get(settings).getBytes();
+        this.operationLimit = MAX_OPERATION_SIZE.get(settings).getBytes();
     }
 
     private static Releasable wrapReleasable(Releasable releasable) {
@@ -302,7 +313,13 @@ public class IndexingPressure {
         }
     }
 
-    public Releasable markPrimaryOperationLocalToCoordinatingNodeStarted(int operations, long bytes) {
+    public Releasable validateAndMarkPrimaryOperationLocalToCoordinatingNodeStarted(
+        int operations,
+        long bytes,
+        long largestOperationSizeInBytes,
+        boolean allowsOperationsBeyondSizeLimit
+    ) {
+        checkLargestPrimaryOperationIsWithinLimits(operations, largestOperationSizeInBytes, allowsOperationsBeyondSizeLimit);
         currentPrimaryBytes.getAndAdd(bytes);
         currentPrimaryOps.getAndAdd(operations);
         totalPrimaryBytes.getAndAdd(bytes);
@@ -313,7 +330,49 @@ public class IndexingPressure {
         });
     }
 
-    public Releasable markPrimaryOperationStarted(int operations, long bytes, boolean forceExecution) {
+    void checkLargestPrimaryOperationIsWithinLimits(
+        int operations,
+        long largestOperationSizeInBytes,
+        boolean allowsOperationsBeyondSizeLimit
+    ) {
+        if (largestOperationSizeInBytes > operationLimit) {
+            this.largeOpsRejections.getAndIncrement();
+            this.totalRejectedLargeOpsBytes.addAndGet(largestOperationSizeInBytes);
+            if (allowsOperationsBeyondSizeLimit == false) {
+                this.primaryRejections.getAndIncrement();
+                this.primaryDocumentRejections.addAndGet(operations);
+                throw new EsRejectedExecutionException(
+                    "Request contains an operation of size ["
+                        + largestOperationSizeInBytes
+                        + "] bytes, which exceeds the maximum allowed limit of ["
+                        + operationLimit
+                        + "] bytes"
+                );
+            }
+        }
+    }
+
+    public Releasable validateAndMarkPrimaryOperationStarted(
+        int operations,
+        long bytes,
+        long largestOperationSizeInBytes,
+        boolean forceExecution,
+        boolean allowsOperationsBeyondSizeLimit
+    ) {
+        checkLargestPrimaryOperationIsWithinLimits(operations, largestOperationSizeInBytes, allowsOperationsBeyondSizeLimit);
+        return markPrimaryOperationStarted(operations, bytes, forceExecution, false);
+    }
+
+    public Releasable trackPrimaryOperationExpansion(int operations, long expandedBytes, boolean forceExecution) {
+        return markPrimaryOperationStarted(operations, expandedBytes, forceExecution, true);
+    }
+
+    // visible for testing
+    Releasable markPrimaryOperationStarted(int operations, long bytes, boolean forceExecution) {
+        return markPrimaryOperationStarted(operations, bytes, forceExecution, false);
+    }
+
+    private Releasable markPrimaryOperationStarted(int operations, long bytes, boolean forceExecution, boolean operationExpansionTracking) {
         long combinedBytes = this.currentCombinedCoordinatingAndPrimaryBytes.addAndGet(bytes);
         long replicaWriteBytes = this.currentReplicaBytes.get();
         long totalBytes = combinedBytes + replicaWriteBytes;
@@ -345,18 +404,30 @@ public class IndexingPressure {
         }
         logger.trace(() -> Strings.format("adding [%d] primary operations and [%d] bytes", operations, bytes));
         currentPrimaryBytes.getAndAdd(bytes);
-        currentPrimaryOps.getAndAdd(operations);
         totalCombinedCoordinatingAndPrimaryBytes.getAndAdd(bytes);
         totalPrimaryBytes.getAndAdd(bytes);
-        totalPrimaryOps.getAndAdd(operations);
+        // If operation expansion is being tracked, we don't re-count the operations,
+        // as they were already included in the request when it was initially received
+        if (operationExpansionTracking == false) {
+            currentPrimaryOps.getAndAdd(operations);
+            totalPrimaryOps.getAndAdd(operations);
+        }
         return wrapReleasable(() -> {
             logger.trace(() -> Strings.format("removing [%d] primary operations and [%d] bytes", operations, bytes));
             this.currentCombinedCoordinatingAndPrimaryBytes.getAndAdd(-bytes);
             this.currentPrimaryBytes.getAndAdd(-bytes);
-            this.currentPrimaryOps.getAndAdd(-operations);
+            if (operationExpansionTracking == false) {
+                this.currentPrimaryOps.getAndAdd(-operations);
+            }
         });
     }
 
+    public Releasable trackReplicaOperationExpansion(long expandedBytes, boolean forceExecution) {
+        // Operations are already tracked by the initial call to #markReplicaStarted.
+        // This method only increments the in-flight bytes to account for operation expansion during indexing.
+        return markReplicaOperationStarted(0, expandedBytes, forceExecution);
+    }
+
     public Releasable markReplicaOperationStarted(int operations, long bytes, boolean forceExecution) {
         long replicaWriteBytes = this.currentReplicaBytes.addAndGet(bytes);
         if (forceExecution == false && replicaWriteBytes > replicaLimit) {
@@ -409,7 +480,9 @@ public class IndexingPressure {
             primaryDocumentRejections.get(),
             totalCoordinatingRequests.get(),
             lowWaterMarkSplits.get(),
-            highWaterMarkSplits.get()
+            highWaterMarkSplits.get(),
+            largeOpsRejections.get(),
+            totalRejectedLargeOpsBytes.get()
         );
     }
 }

+ 30 - 1
server/src/main/java/org/elasticsearch/index/stats/IndexingPressureStats.java

@@ -41,6 +41,8 @@ public class IndexingPressureStats implements Writeable, ToXContentFragment {
      */
     private final long lowWaterMarkSplits;
     private final long highWaterMarkSplits;
+    private final long largeOpsRejections;
+    private final long totalLargeRejectedOpsBytes;
 
     // These fields will be used for additional back-pressure and metrics in the future
     private final long totalCoordinatingOps;
@@ -95,6 +97,14 @@ public class IndexingPressureStats implements Writeable, ToXContentFragment {
             lowWaterMarkSplits = -1L;
             highWaterMarkSplits = -1L;
         }
+
+        if (in.getTransportVersion().onOrAfter(TransportVersions.MAX_OPERATION_SIZE_REJECTIONS_ADDED)) {
+            largeOpsRejections = in.readVLong();
+            totalLargeRejectedOpsBytes = in.readVLong();
+        } else {
+            largeOpsRejections = -1L;
+            totalLargeRejectedOpsBytes = -1L;
+        }
     }
 
     public IndexingPressureStats(
@@ -119,7 +129,9 @@ public class IndexingPressureStats implements Writeable, ToXContentFragment {
         long primaryDocumentRejections,
         long totalCoordinatingRequests,
         long lowWaterMarkSplits,
-        long highWaterMarkSplits
+        long highWaterMarkSplits,
+        long largeOpsRejections,
+        long totalRejectedLargeOpsBytes
     ) {
         this.totalCombinedCoordinatingAndPrimaryBytes = totalCombinedCoordinatingAndPrimaryBytes;
         this.totalCoordinatingBytes = totalCoordinatingBytes;
@@ -146,6 +158,8 @@ public class IndexingPressureStats implements Writeable, ToXContentFragment {
 
         this.lowWaterMarkSplits = lowWaterMarkSplits;
         this.highWaterMarkSplits = highWaterMarkSplits;
+        this.largeOpsRejections = largeOpsRejections;
+        this.totalLargeRejectedOpsBytes = totalRejectedLargeOpsBytes;
     }
 
     @Override
@@ -178,6 +192,11 @@ public class IndexingPressureStats implements Writeable, ToXContentFragment {
             out.writeVLong(lowWaterMarkSplits);
             out.writeVLong(highWaterMarkSplits);
         }
+
+        if (out.getTransportVersion().onOrAfter(TransportVersions.MAX_OPERATION_SIZE_REJECTIONS_ADDED)) {
+            out.writeVLong(largeOpsRejections);
+            out.writeVLong(totalLargeRejectedOpsBytes);
+        }
     }
 
     public long getTotalCombinedCoordinatingAndPrimaryBytes() {
@@ -268,6 +287,14 @@ public class IndexingPressureStats implements Writeable, ToXContentFragment {
         return lowWaterMarkSplits;
     }
 
+    public long getLargeOpsRejections() {
+        return largeOpsRejections;
+    }
+
+    public long getTotalLargeRejectedOpsBytes() {
+        return totalLargeRejectedOpsBytes;
+    }
+
     private static final String COMBINED = "combined_coordinating_and_primary";
     private static final String COMBINED_IN_BYTES = "combined_coordinating_and_primary_in_bytes";
     private static final String COORDINATING = "coordinating";
@@ -284,6 +311,7 @@ public class IndexingPressureStats implements Writeable, ToXContentFragment {
     private static final String PRIMARY_DOCUMENT_REJECTIONS = "primary_document_rejections";
     private static final String LIMIT = "limit";
     private static final String LIMIT_IN_BYTES = "limit_in_bytes";
+    private static final String LARGE_OPERATION_REJECTIONS = "large_operation_rejections";
 
     @Override
     public XContentBuilder toXContent(XContentBuilder builder, Params params) throws IOException {
@@ -310,6 +338,7 @@ public class IndexingPressureStats implements Writeable, ToXContentFragment {
         builder.field(PRIMARY_REJECTIONS, primaryRejections);
         builder.field(REPLICA_REJECTIONS, replicaRejections);
         builder.field(PRIMARY_DOCUMENT_REJECTIONS, primaryDocumentRejections);
+        builder.field(LARGE_OPERATION_REJECTIONS, largeOpsRejections);
         builder.endObject();
         builder.humanReadableField(LIMIT_IN_BYTES, LIMIT, ByteSizeValue.ofBytes(memoryLimit));
         builder.endObject();

+ 2 - 0
server/src/test/java/org/elasticsearch/action/admin/cluster/node/stats/NodeStatsTests.java

@@ -1030,6 +1030,8 @@ public class NodeStatsTests extends ESTestCase {
         if (frequently()) {
             long maxStatValue = Long.MAX_VALUE / 5;
             indexingPressureStats = new IndexingPressureStats(
+                randomLongBetween(0, maxStatValue),
+                randomLongBetween(0, maxStatValue),
                 randomLongBetween(0, maxStatValue),
                 randomLongBetween(0, maxStatValue),
                 randomLongBetween(0, maxStatValue),

+ 7 - 3
server/src/test/java/org/elasticsearch/action/admin/cluster/stats/ClusterStatsNodesTests.java

@@ -115,7 +115,7 @@ public class ClusterStatsNodesTests extends ESTestCase {
             randomValueOtherThanMany(n -> n.getIndexingPressureStats() == null, NodeStatsTests::createNodeStats),
             randomValueOtherThanMany(n -> n.getIndexingPressureStats() == null, NodeStatsTests::createNodeStats)
         );
-        long[] expectedStats = new long[13];
+        long[] expectedStats = new long[14];
         for (NodeStats nodeStat : nodeStats) {
             IndexingPressureStats indexingPressureStats = nodeStat.getIndexingPressureStats();
             if (indexingPressureStats != null) {
@@ -133,8 +133,9 @@ public class ClusterStatsNodesTests extends ESTestCase {
                 expectedStats[9] += indexingPressureStats.getPrimaryRejections();
                 expectedStats[10] += indexingPressureStats.getReplicaRejections();
                 expectedStats[11] += indexingPressureStats.getPrimaryDocumentRejections();
+                expectedStats[12] += indexingPressureStats.getLargeOpsRejections();
 
-                expectedStats[12] += indexingPressureStats.getMemoryLimit();
+                expectedStats[13] += indexingPressureStats.getMemoryLimit();
             }
         }
 
@@ -187,9 +188,12 @@ public class ClusterStatsNodesTests extends ESTestCase {
                     + ","
                     + "\"primary_document_rejections\":"
                     + expectedStats[11]
+                    + ","
+                    + "\"large_operation_rejections\":"
+                    + expectedStats[12]
                     + "},"
                     + "\"limit_in_bytes\":"
-                    + expectedStats[12]
+                    + expectedStats[13]
                     + "}"
                     + "}}"
             )

+ 115 - 2
server/src/test/java/org/elasticsearch/index/IndexingPressureTests.java

@@ -29,6 +29,7 @@ public class IndexingPressureTests extends ESTestCase {
         .put(IndexingPressure.SPLIT_BULK_LOW_WATERMARK_SIZE.getKey(), "1KB")
         .put(IndexingPressure.SPLIT_BULK_HIGH_WATERMARK.getKey(), "9KB")
         .put(IndexingPressure.SPLIT_BULK_HIGH_WATERMARK_SIZE.getKey(), "128B")
+        .put(IndexingPressure.MAX_OPERATION_SIZE.getKey(), "128B")
         .build();
 
     public void testMemoryLimitSettingsFallbackToOldSingleLimitSetting() {
@@ -133,7 +134,7 @@ public class IndexingPressureTests extends ESTestCase {
         IndexingPressure indexingPressure = new IndexingPressure(settings);
         try (
             Releasable coordinating = indexingPressure.markCoordinatingOperationStarted(1, 10, false);
-            Releasable primary = indexingPressure.markPrimaryOperationLocalToCoordinatingNodeStarted(1, 15)
+            Releasable primary = indexingPressure.validateAndMarkPrimaryOperationLocalToCoordinatingNodeStarted(1, 15, 15, true)
         ) {
             IndexingPressureStats stats = indexingPressure.stats();
             assertEquals(10, stats.getCurrentCoordinatingBytes());
@@ -179,7 +180,7 @@ public class IndexingPressureTests extends ESTestCase {
 
             // Local to coordinating node primary actions not rejected
             IndexingPressureStats preLocalStats = indexingPressure.stats();
-            Releasable local = indexingPressure.markPrimaryOperationLocalToCoordinatingNodeStarted(1, 1024 * 2);
+            Releasable local = indexingPressure.validateAndMarkPrimaryOperationLocalToCoordinatingNodeStarted(1, 1024 * 2, 1024 * 2, true);
             assertEquals(preLocalStats.getPrimaryRejections(), indexingPressure.stats().getPrimaryRejections());
             assertEquals(1024 * 6, indexingPressure.stats().getCurrentCombinedCoordinatingAndPrimaryBytes());
             assertEquals(preLocalStats.getCurrentPrimaryBytes() + 1024 * 2, indexingPressure.stats().getCurrentPrimaryBytes());
@@ -216,6 +217,118 @@ public class IndexingPressureTests extends ESTestCase {
         assertEquals(1024 * 14, indexingPressure.stats().getTotalReplicaBytes());
     }
 
+    public void testPrimaryOperationExpansionAccounting() {
+        IndexingPressure indexingPressure = new IndexingPressure(settings);
+        // Primary limit is 12kb
+        try (
+            Releasable coordinating = indexingPressure.markCoordinatingOperationStarted(2, 1024 * 3, false);
+            Releasable primary = indexingPressure.markPrimaryOperationStarted(2, 1024 * 3, false);
+        ) {
+            var opsExpansionReleasable = indexingPressure.trackPrimaryOperationExpansion(2, 1024 * 3, false);
+            assertEquals(0, indexingPressure.stats().getPrimaryRejections());
+            assertEquals(0, indexingPressure.stats().getPrimaryDocumentRejections());
+            assertEquals(1024 * 9, indexingPressure.stats().getCurrentCombinedCoordinatingAndPrimaryBytes());
+
+            opsExpansionReleasable.close();
+        }
+
+        assertEquals(1024 * 6, indexingPressure.stats().getTotalPrimaryBytes());
+        // ensure that the expansion does not increase the number of ops twice
+        assertEquals(2, indexingPressure.stats().getTotalPrimaryOps());
+    }
+
+    public void testPrimaryOperationExpansionAccountingRejections() {
+        IndexingPressure indexingPressure = new IndexingPressure(settings);
+        // Primary limit is 12kb
+        try (
+            Releasable coordinating = indexingPressure.markCoordinatingOperationStarted(2, 1024 * 3, false);
+            Releasable primary = indexingPressure.markPrimaryOperationStarted(2, 1024 * 3, false);
+        ) {
+            expectThrows(EsRejectedExecutionException.class, () -> indexingPressure.trackPrimaryOperationExpansion(2, 1024 * 8, false));
+            assertEquals(1, indexingPressure.stats().getPrimaryRejections());
+            assertEquals(2, indexingPressure.stats().getPrimaryDocumentRejections());
+            assertEquals(1024 * 6, indexingPressure.stats().getCurrentCombinedCoordinatingAndPrimaryBytes());
+
+            var forcedOpsExpansionReleasable = indexingPressure.trackPrimaryOperationExpansion(2, 1024 * 8, true);
+            assertEquals(1, indexingPressure.stats().getPrimaryRejections());
+            assertEquals(2, indexingPressure.stats().getPrimaryDocumentRejections());
+            assertEquals(1024 * 14, indexingPressure.stats().getCurrentCombinedCoordinatingAndPrimaryBytes());
+
+            forcedOpsExpansionReleasable.close();
+        }
+
+        assertEquals(1024 * 11, indexingPressure.stats().getTotalPrimaryBytes());
+        assertEquals(2, indexingPressure.stats().getTotalPrimaryOps());
+    }
+
+    public void testReplicaOperationExpansionAccounting() {
+        IndexingPressure indexingPressure = new IndexingPressure(settings);
+        // Replica limit is 15kb
+        try (
+            Releasable coordinating = indexingPressure.markCoordinatingOperationStarted(2, 1024 * 3, false);
+            Releasable primary = indexingPressure.markPrimaryOperationStarted(2, 1024 * 3, false);
+            Releasable replica = indexingPressure.markReplicaOperationStarted(2, 1024 * 3, false);
+        ) {
+            var opsExpansionReleasable = indexingPressure.trackReplicaOperationExpansion(1024 * 3, false);
+            assertEquals(0, indexingPressure.stats().getReplicaRejections());
+            assertEquals(1024 * 6, indexingPressure.stats().getCurrentReplicaBytes());
+
+            opsExpansionReleasable.close();
+        }
+
+        assertEquals(1024 * 6, indexingPressure.stats().getTotalReplicaBytes());
+        // ensure that the expansion does not increase the number of ops twice
+        assertEquals(2, indexingPressure.stats().getTotalReplicaOps());
+    }
+
+    public void testReplicaOperationRejectionsExpansionAccounting() {
+        IndexingPressure indexingPressure = new IndexingPressure(settings);
+        // Replica limit is 15kb
+        try (
+            Releasable coordinating = indexingPressure.markCoordinatingOperationStarted(2, 1024 * 3, false);
+            Releasable primary = indexingPressure.markPrimaryOperationStarted(2, 1024 * 3, false);
+            Releasable replica = indexingPressure.markReplicaOperationStarted(2, 1024 * 3, false);
+        ) {
+            expectThrows(EsRejectedExecutionException.class, () -> indexingPressure.trackReplicaOperationExpansion(1024 * 16, false));
+            assertEquals(1, indexingPressure.stats().getReplicaRejections());
+            assertEquals(1024 * 3, indexingPressure.stats().getCurrentReplicaBytes());
+
+            var forcedReplicaExpansionReleasable = indexingPressure.trackReplicaOperationExpansion(1024 * 16, true);
+            assertEquals(1, indexingPressure.stats().getReplicaRejections());
+            assertEquals(1024 * 19, indexingPressure.stats().getCurrentReplicaBytes());
+            forcedReplicaExpansionReleasable.close();
+        }
+
+        assertEquals(1024 * 19, indexingPressure.stats().getTotalReplicaBytes());
+        // ensure that the expansion does not increase the number of ops twice
+        assertEquals(2, indexingPressure.stats().getTotalReplicaOps());
+    }
+
+    public void testLargeOperationRejections() {
+        IndexingPressure indexingPressure = new IndexingPressure(settings);
+        // max operation size is 128b
+        indexingPressure.checkLargestPrimaryOperationIsWithinLimits(1, 1, false);
+        assertEquals(0L, indexingPressure.stats().getLargeOpsRejections());
+        assertEquals(0L, indexingPressure.stats().getTotalLargeRejectedOpsBytes());
+        assertEquals(0L, indexingPressure.stats().getPrimaryRejections());
+        assertEquals(0L, indexingPressure.stats().getPrimaryDocumentRejections());
+
+        expectThrows(
+            EsRejectedExecutionException.class,
+            () -> indexingPressure.checkLargestPrimaryOperationIsWithinLimits(12, 2048, false)
+        );
+        assertEquals(1L, indexingPressure.stats().getLargeOpsRejections());
+        assertEquals(1024L * 2, indexingPressure.stats().getTotalLargeRejectedOpsBytes());
+        assertEquals(1L, indexingPressure.stats().getPrimaryRejections());
+        assertEquals(12L, indexingPressure.stats().getPrimaryDocumentRejections());
+
+        indexingPressure.checkLargestPrimaryOperationIsWithinLimits(12, 2048, true);
+        assertEquals(2L, indexingPressure.stats().getLargeOpsRejections());
+        assertEquals(2 * 2048L, indexingPressure.stats().getTotalLargeRejectedOpsBytes());
+        assertEquals(1L, indexingPressure.stats().getPrimaryRejections());
+        assertEquals(12L, indexingPressure.stats().getPrimaryDocumentRejections());
+    }
+
     public void testForceExecutionOnCoordinating() {
         IndexingPressure indexingPressure = new IndexingPressure(settings);
         expectThrows(EsRejectedExecutionException.class, () -> indexingPressure.markCoordinatingOperationStarted(1, 1024 * 11, false));

+ 5 - 0
x-pack/plugin/ccr/src/main/java/org/elasticsearch/xpack/ccr/action/bulk/TransportBulkShardOperationsAction.java

@@ -127,6 +127,11 @@ public class TransportBulkShardOperationsAction extends TransportWriteAction<
         return request.getOperations().size();
     }
 
+    @Override
+    protected long primaryLargestOperationSize(BulkShardOperationsRequest request) {
+        return request.getOperations().stream().mapToLong(Translog.Operation::estimateSize).max().orElse(0);
+    }
+
     public static Translog.Operation rewriteOperationWithPrimaryTerm(Translog.Operation operation, long primaryTerm) {
         final Translog.Operation operationWithPrimaryTerm;
         switch (operation.opType()) {

+ 2 - 1
x-pack/plugin/monitoring/src/test/java/org/elasticsearch/xpack/monitoring/collector/cluster/ClusterStatsMonitoringDocTests.java

@@ -741,7 +741,8 @@ public class ClusterStatsMonitoringDocTests extends BaseMonitoringDocTestCase<Cl
                         "coordinating_rejections": 0,
                         "primary_rejections": 0,
                         "replica_rejections": 0,
-                        "primary_document_rejections": 0
+                        "primary_document_rejections": 0,
+                        "large_operation_rejections":0
                       },
                       "limit_in_bytes": 0
                     }