Переглянути джерело

[8.19] Threadpool merge executor does not block aborted merges (#129613) (#129728)

* Threadpool merge executor does not block aborted merges (#129613)

This PR addresses a bug where aborted merges are blocked if there's
insufficient disk space.

Previously, the merge disk space estimation did not consider if the
operation has been aborted when/while it was enqueued for execution.
Consequently, aborted merges, for e.g. when closing a shard, were
blocked if their disk space estimation was exceeding the available disk
space threshold. In this case, the shard close operation would itself
block.

This fix estimates a disk space budget of `0` for aborted merges, and it
periodically checks if any enqueued merge tasks have been aborted (more
generally, it checks if the budget estimate for any merge tasks has
changed, and reorders the queue if so). This way aborted merges are
prioritized and are never blocked.

Closes https://github.com/elastic/elasticsearch/issues/129335

* ClusterDisruptionIT.java

* [CI] Auto commit changes from spotless

---------

Co-authored-by: elasticsearchmachine <infra-root+elasticsearchmachine@elastic.co>
Albert Zaharovits 4 місяців тому
батько
коміт
1c30e34525

+ 159 - 0
server/src/internalClusterTest/java/org/elasticsearch/index/engine/MergeWithLowDiskSpaceIT.java

@@ -0,0 +1,159 @@
+/*
+ * Copyright Elasticsearch B.V. and/or licensed to Elasticsearch B.V. under one
+ * or more contributor license agreements. Licensed under the "Elastic License
+ * 2.0", the "GNU Affero General Public License v3.0 only", and the "Server Side
+ * Public License v 1"; you may not use this file except in compliance with, at
+ * your election, the "Elastic License 2.0", the "GNU Affero General Public
+ * License v3.0 only", or the "Server Side Public License, v 1".
+ */
+
+package org.elasticsearch.index.engine;
+
+import org.elasticsearch.action.admin.cluster.node.stats.NodeStats;
+import org.elasticsearch.action.admin.cluster.node.stats.NodesStatsResponse;
+import org.elasticsearch.action.admin.indices.stats.IndicesStatsResponse;
+import org.elasticsearch.cluster.DiskUsageIntegTestCase;
+import org.elasticsearch.cluster.metadata.IndexMetadata;
+import org.elasticsearch.cluster.routing.allocation.DiskThresholdSettings;
+import org.elasticsearch.common.settings.Settings;
+import org.elasticsearch.common.util.concurrent.EsExecutors;
+import org.elasticsearch.index.IndexNotFoundException;
+import org.elasticsearch.indices.IndicesService;
+import org.elasticsearch.test.ESIntegTestCase;
+import org.elasticsearch.threadpool.ThreadPool;
+import org.junit.BeforeClass;
+
+import java.util.Locale;
+import java.util.stream.IntStream;
+
+import static org.elasticsearch.test.hamcrest.ElasticsearchAssertions.assertAcked;
+import static org.elasticsearch.test.hamcrest.ElasticsearchAssertions.assertNoFailures;
+import static org.hamcrest.Matchers.equalTo;
+import static org.hamcrest.Matchers.lessThan;
+
+@ESIntegTestCase.ClusterScope(scope = ESIntegTestCase.Scope.TEST, numDataNodes = 0)
+public class MergeWithLowDiskSpaceIT extends DiskUsageIntegTestCase {
+    protected static long MERGE_DISK_HIGH_WATERMARK_BYTES;
+
+    @BeforeClass
+    public static void setAvailableDiskSpaceBufferLimit() {
+        // this has to be big in order to potentially accommodate the disk space for a few 100s of docs and a few merges,
+        // because of the latency to process used disk space updates, and also because we cannot reliably separate indexing from merging
+        // operations at this high abstraction level (merging is triggered more or less automatically in the background)
+        MERGE_DISK_HIGH_WATERMARK_BYTES = randomLongBetween(1_000_000L, 2_000_000L);
+    }
+
+    @Override
+    protected Settings nodeSettings(int nodeOrdinal, Settings otherSettings) {
+        return Settings.builder()
+            .put(super.nodeSettings(nodeOrdinal, otherSettings))
+            // only the threadpool-based merge scheduler has the capability to block merges when disk space is insufficient
+            .put(ThreadPoolMergeScheduler.USE_THREAD_POOL_MERGE_SCHEDULER_SETTING.getKey(), true)
+            // the very short disk space polling interval ensures timely blocking of merges
+            .put(ThreadPoolMergeExecutorService.INDICES_MERGE_DISK_CHECK_INTERVAL_SETTING.getKey(), "10ms")
+            // merges pile up more easily when there's only a few threads executing them
+            .put(EsExecutors.NODE_PROCESSORS_SETTING.getKey(), randomIntBetween(1, 2))
+            .put(ThreadPoolMergeExecutorService.INDICES_MERGE_DISK_HIGH_WATERMARK_SETTING.getKey(), MERGE_DISK_HIGH_WATERMARK_BYTES + "b")
+            // let's not worry about allocation watermarks (e.g. read-only shards) in this test suite
+            .put(DiskThresholdSettings.CLUSTER_ROUTING_ALLOCATION_LOW_DISK_WATERMARK_SETTING.getKey(), "0b")
+            .put(DiskThresholdSettings.CLUSTER_ROUTING_ALLOCATION_HIGH_DISK_WATERMARK_SETTING.getKey(), "0b")
+            .put(DiskThresholdSettings.CLUSTER_ROUTING_ALLOCATION_DISK_FLOOD_STAGE_WATERMARK_SETTING.getKey(), "0b")
+            .build();
+    }
+
+    public void testShardCloseWhenDiskSpaceInsufficient() throws Exception {
+        String node = internalCluster().startNode();
+        setTotalSpace(node, Long.MAX_VALUE);
+        var indicesService = internalCluster().getInstance(IndicesService.class, node);
+        ensureStableCluster(1);
+        // create index
+        final String indexName = randomAlphaOfLength(10).toLowerCase(Locale.ROOT);
+        createIndex(
+            indexName,
+            Settings.builder().put(IndexMetadata.SETTING_NUMBER_OF_REPLICAS, 0).put(IndexMetadata.SETTING_NUMBER_OF_SHARDS, 1).build()
+        );
+        // do some indexing
+        indexRandom(
+            false,
+            false,
+            false,
+            false,
+            IntStream.range(1, randomIntBetween(2, 10))
+                .mapToObj(i -> prepareIndex(indexName).setSource("field", randomAlphaOfLength(50)))
+                .toList()
+        );
+        // get current disk space usage
+        IndicesStatsResponse stats = indicesAdmin().prepareStats().clear().setStore(true).get();
+        long usedDiskSpaceAfterIndexing = stats.getTotal().getStore().sizeInBytes();
+        // restrict the total disk space such that the next merge does not have sufficient disk space
+        long insufficientTotalDiskSpace = usedDiskSpaceAfterIndexing + MERGE_DISK_HIGH_WATERMARK_BYTES - randomLongBetween(1L, 10L);
+        setTotalSpace(node, insufficientTotalDiskSpace);
+        // node stats' FS stats should report that there is insufficient disk space available
+        assertBusy(() -> {
+            NodesStatsResponse nodesStatsResponse = client().admin().cluster().prepareNodesStats().setFs(true).get();
+            assertThat(nodesStatsResponse.getNodes().size(), equalTo(1));
+            NodeStats nodeStats = nodesStatsResponse.getNodes().get(0);
+            assertThat(nodeStats.getFs().getTotal().getTotal().getBytes(), equalTo(insufficientTotalDiskSpace));
+            assertThat(nodeStats.getFs().getTotal().getAvailable().getBytes(), lessThan(MERGE_DISK_HIGH_WATERMARK_BYTES));
+        });
+        while (true) {
+            // maybe trigger a merge (this still depends on the merge policy, i.e. it is not 100% guaranteed)
+            assertNoFailures(indicesAdmin().prepareForceMerge(indexName).get());
+            // keep indexing and ask for merging until node stats' threadpool stats reports enqueued merges,
+            // and the merge executor says they're blocked due to insufficient disk space if (nodesStatsResponse.getNodes()
+            NodesStatsResponse nodesStatsResponse = client().admin().cluster().prepareNodesStats().setThreadPool(true).get();
+            if (nodesStatsResponse.getNodes()
+                .get(0)
+                .getThreadPool()
+                .stats()
+                .stream()
+                .filter(s -> ThreadPool.Names.MERGE.equals(s.name()))
+                .findAny()
+                .get()
+                .queue() > 0
+                && indicesService.getThreadPoolMergeExecutorService().isMergingBlockedDueToInsufficientDiskSpace()) {
+                break;
+            }
+            // more indexing
+            indexRandom(
+                false,
+                false,
+                false,
+                false,
+                IntStream.range(1, randomIntBetween(2, 10))
+                    .mapToObj(i -> prepareIndex(indexName).setSource("another_field", randomAlphaOfLength(50)))
+                    .toList()
+            );
+        }
+        // now delete the index in this state, i.e. with merges enqueued and blocked
+        assertAcked(indicesAdmin().prepareDelete(indexName).get());
+        // index should now be gone
+        assertBusy(
+            () -> { expectThrows(IndexNotFoundException.class, () -> indicesAdmin().prepareGetIndex().setIndices(indexName).get()); }
+        );
+        assertBusy(() -> {
+            // merge thread pool should be done with the enqueue merge tasks
+            NodesStatsResponse nodesStatsResponse = client().admin().cluster().prepareNodesStats().setThreadPool(true).get();
+            assertThat(
+                nodesStatsResponse.getNodes()
+                    .get(0)
+                    .getThreadPool()
+                    .stats()
+                    .stream()
+                    .filter(s -> ThreadPool.Names.MERGE.equals(s.name()))
+                    .findAny()
+                    .get()
+                    .queue(),
+                equalTo(0)
+            );
+            // and the merge executor should also report that merging is done now
+            assertFalse(indicesService.getThreadPoolMergeExecutorService().isMergingBlockedDueToInsufficientDiskSpace());
+            assertTrue(indicesService.getThreadPoolMergeExecutorService().allDone());
+        });
+    }
+
+    public void setTotalSpace(String dataNodeName, long totalSpace) {
+        getTestFileStore(dataNodeName).setTotalSpace(totalSpace);
+        refreshClusterInfo();
+    }
+}

+ 50 - 18
server/src/main/java/org/elasticsearch/index/engine/ThreadPoolMergeExecutorService.java

@@ -20,6 +20,7 @@ import org.elasticsearch.common.util.concurrent.ConcurrentCollections;
 import org.elasticsearch.core.Nullable;
 import org.elasticsearch.core.Releasable;
 import org.elasticsearch.core.TimeValue;
+import org.elasticsearch.core.Tuple;
 import org.elasticsearch.env.NodeEnvironment;
 import org.elasticsearch.index.engine.ThreadPoolMergeScheduler.MergeTask;
 import org.elasticsearch.monitor.fs.FsInfo;
@@ -28,6 +29,7 @@ import org.elasticsearch.threadpool.ThreadPool;
 
 import java.io.Closeable;
 import java.io.IOException;
+import java.util.ArrayList;
 import java.util.Arrays;
 import java.util.Comparator;
 import java.util.IdentityHashMap;
@@ -59,10 +61,7 @@ public class ThreadPoolMergeExecutorService implements Closeable {
     /** How frequently we check disk usage (default: 5 seconds). */
     public static final Setting<TimeValue> INDICES_MERGE_DISK_CHECK_INTERVAL_SETTING = Setting.positiveTimeSetting(
         "indices.merge.disk.check_interval",
-        // disabled by default
-        // there's currently a problem where (aborting) merges are blocked when shards are closed (because disk space is insufficient)
-        // see: https://github.com/elastic/elasticsearch/issues/129335
-        TimeValue.timeValueSeconds(0),
+        TimeValue.timeValueSeconds(5),
         Property.Dynamic,
         Property.NodeScope
     );
@@ -294,6 +293,10 @@ public class ThreadPoolMergeExecutorService implements Closeable {
         return queuedMergeTasks.isQueueEmpty() && runningMergeTasks.isEmpty() && ioThrottledMergeTasksCount.get() == 0L;
     }
 
+    public boolean isMergingBlockedDueToInsufficientDiskSpace() {
+        return availableDiskSpacePeriodicMonitor.isScheduled() && queuedMergeTasks.queueHeadIsOverTheAvailableBudget();
+    }
+
     /**
      * Enqueues a runnable that executes exactly one merge task, the smallest that is runnable at some point in time.
      * A merge task is not runnable if its scheduler already reached the configured max-allowed concurrency level.
@@ -550,9 +553,8 @@ public class ThreadPoolMergeExecutorService implements Closeable {
 
     static class MergeTaskPriorityBlockingQueue extends PriorityBlockingQueueWithBudget<MergeTask> {
         MergeTaskPriorityBlockingQueue() {
-            // start with 0 budget (so takes on this queue will always block until {@link #updateBudget} is invoked)
-            // use the estimated *remaining* merge size as the budget function so that the disk space budget of taken (in-use) elements is
-            // updated according to the remaining disk space requirements of the currently running merge tasks
+            // by default, start with 0 budget (so takes on this queue will always block until the first {@link #updateBudget} is invoked)
+            // use the estimated *remaining* merge size as the budget function so that the disk space budget of elements is updated
             super(MergeTask::estimatedRemainingMergeSize, 0L);
         }
 
@@ -563,7 +565,7 @@ public class ThreadPoolMergeExecutorService implements Closeable {
 
         // exposed for tests
         MergeTask peekQueue() {
-            return enqueuedByBudget.peek();
+            return enqueuedByBudget.peek().v1();
         }
     }
 
@@ -573,7 +575,7 @@ public class ThreadPoolMergeExecutorService implements Closeable {
      */
     static class PriorityBlockingQueueWithBudget<E> {
         private final ToLongFunction<? super E> budgetFunction;
-        protected final PriorityQueue<E> enqueuedByBudget;
+        protected final PriorityQueue<Tuple<E, Long>> enqueuedByBudget;
         private final IdentityHashMap<ElementWithReleasableBudget, Long> unreleasedBudgetPerElement;
         private final ReentrantLock lock;
         private final Condition elementAvailable;
@@ -581,7 +583,7 @@ public class ThreadPoolMergeExecutorService implements Closeable {
 
         PriorityBlockingQueueWithBudget(ToLongFunction<? super E> budgetFunction, long initialAvailableBudget) {
             this.budgetFunction = budgetFunction;
-            this.enqueuedByBudget = new PriorityQueue<>(64, Comparator.comparingLong(budgetFunction));
+            this.enqueuedByBudget = new PriorityQueue<>(64, Comparator.comparingLong(Tuple::v2));
             this.unreleasedBudgetPerElement = new IdentityHashMap<>();
             this.lock = new ReentrantLock();
             this.elementAvailable = lock.newCondition();
@@ -592,7 +594,7 @@ public class ThreadPoolMergeExecutorService implements Closeable {
             final ReentrantLock lock = this.lock;
             lock.lock();
             try {
-                enqueuedByBudget.offer(e);
+                enqueuedByBudget.offer(new Tuple<>(e, budgetFunction.applyAsLong(e)));
                 elementAvailable.signal();
             } finally {
                 lock.unlock();
@@ -608,14 +610,14 @@ public class ThreadPoolMergeExecutorService implements Closeable {
             final ReentrantLock lock = this.lock;
             lock.lockInterruptibly();
             try {
-                E peek;
-                long peekBudget;
+                Tuple<E, Long> head;
                 // blocks until the smallest budget element fits the currently available budget
-                while ((peek = enqueuedByBudget.peek()) == null || (peekBudget = budgetFunction.applyAsLong(peek)) > availableBudget) {
+                while ((head = enqueuedByBudget.peek()) == null || head.v2() > availableBudget) {
                     elementAvailable.await();
                 }
+                head = enqueuedByBudget.poll();
                 // deducts and holds up that element's budget from the available budget
-                return newElementWithReleasableBudget(enqueuedByBudget.poll(), peekBudget);
+                return newElementWithReleasableBudget(head.v1(), head.v2());
             } finally {
                 lock.unlock();
             }
@@ -623,7 +625,7 @@ public class ThreadPoolMergeExecutorService implements Closeable {
 
         /**
          * Updates the available budged given the passed-in argument, from which it deducts the budget hold up by taken elements
-         * that are still in use. The budget of in-use elements is also updated (by re-applying the budget function).
+         * that are still in use. The elements budget is also updated by re-applying the budget function.
          * The newly updated budget is used to potentially block {@link #take()} operations if the smallest-budget enqueued element
          * is over this newly computed available budget.
          */
@@ -632,9 +634,11 @@ public class ThreadPoolMergeExecutorService implements Closeable {
             lock.lock();
             try {
                 this.availableBudget = availableBudget;
-                // update the per-element budget (these are all the elements that are using any budget)
+                // updates the budget of enqueued elements (and possibly reorders the priority queue)
+                updateBudgetOfEnqueuedElementsAndReorderQueue();
+                // update the budget of dequeued, but still in-use elements (these are the elements that are consuming budget)
                 unreleasedBudgetPerElement.replaceAll((e, v) -> budgetFunction.applyAsLong(e.element()));
-                // available budget is decreased by the used per-element budget (for all dequeued elements that are still in use)
+                // the available budget is decreased by the budget of still in-use elements (dequeued elements that are still in-use)
                 this.availableBudget -= unreleasedBudgetPerElement.values().stream().mapToLong(i -> i).sum();
                 elementAvailable.signalAll();
             } finally {
@@ -642,10 +646,38 @@ public class ThreadPoolMergeExecutorService implements Closeable {
             }
         }
 
+        private void updateBudgetOfEnqueuedElementsAndReorderQueue() {
+            assert this.lock.isHeldByCurrentThread();
+            int queueSizeBefore = enqueuedByBudget.size();
+            var it = enqueuedByBudget.iterator();
+            List<Tuple<E, Long>> elementsToReorder = new ArrayList<>();
+            while (it.hasNext()) {
+                var elementWithBudget = it.next();
+                Long previousBudget = elementWithBudget.v2();
+                long latestBudget = budgetFunction.applyAsLong(elementWithBudget.v1());
+                if (previousBudget.equals(latestBudget) == false) {
+                    // the budget (estimation) of an enqueued element has changed
+                    // this element will be reordered by removing and reinserting using the latest budget (estimation)
+                    it.remove();
+                    elementsToReorder.add(new Tuple<>(elementWithBudget.v1(), latestBudget));
+                }
+            }
+            // reinsert elements based on the latest budget (estimation)
+            for (var reorderedElement : elementsToReorder) {
+                enqueuedByBudget.offer(reorderedElement);
+            }
+            assert queueSizeBefore == enqueuedByBudget.size();
+        }
+
         boolean isQueueEmpty() {
             return enqueuedByBudget.isEmpty();
         }
 
+        boolean queueHeadIsOverTheAvailableBudget() {
+            var head = enqueuedByBudget.peek();
+            return head != null && head.v2() > availableBudget;
+        }
+
         int queueSize() {
             return enqueuedByBudget.size();
         }

+ 7 - 2
server/src/main/java/org/elasticsearch/index/engine/ThreadPoolMergeScheduler.java

@@ -537,8 +537,13 @@ public class ThreadPoolMergeScheduler extends MergeScheduler implements Elastics
         long estimatedRemainingMergeSize() {
             // TODO is it possible that `estimatedMergeBytes` be `0` for correctly initialize merges,
             // or is it always the case that if `estimatedMergeBytes` is `0` that means that the merge has not yet been initialized?
-            long estimatedMergeSize = onGoingMerge.getMerge().getStoreMergeInfo().estimatedMergeBytes;
-            return Math.max(0L, estimatedMergeSize - rateLimiter.getTotalBytesWritten());
+            if (onGoingMerge.getMerge().isAborted()) {
+                // if the merge is aborted the assumption is that merging will soon stop with negligible further writing
+                return 0L;
+            } else {
+                long estimatedMergeSize = onGoingMerge.getMerge().getStoreMergeInfo().estimatedMergeBytes;
+                return Math.max(0L, estimatedMergeSize - rateLimiter.getTotalBytesWritten());
+            }
         }
 
         public long getMergeMemoryEstimateBytes() {

+ 100 - 0
server/src/test/java/org/elasticsearch/index/engine/ThreadPoolMergeExecutorServiceDiskSpaceTests.java

@@ -892,6 +892,106 @@ public class ThreadPoolMergeExecutorServiceDiskSpaceTests extends ESTestCase {
         }
     }
 
+    public void testEnqueuedMergeTasksAreUnblockedWhenEstimatedMergeSizeChanges() throws Exception {
+        long diskSpaceLimitBytes = randomLongBetween(10L, 100L);
+        aFileStore.usableSpace = diskSpaceLimitBytes + randomLongBetween(1L, 100L);
+        aFileStore.totalSpace = aFileStore.usableSpace + randomLongBetween(1L, 10L);
+        bFileStore.usableSpace = diskSpaceLimitBytes + randomLongBetween(1L, 100L);
+        bFileStore.totalSpace = bFileStore.usableSpace + randomLongBetween(1L, 10L);
+        boolean aHasMoreSpace = aFileStore.usableSpace > bFileStore.usableSpace;
+        Settings.Builder settingsBuilder = Settings.builder().put(settings);
+        settingsBuilder.put(ThreadPoolMergeExecutorService.INDICES_MERGE_DISK_HIGH_WATERMARK_SETTING.getKey(), diskSpaceLimitBytes + "b");
+        try (
+            ThreadPoolMergeExecutorService threadPoolMergeExecutorService = ThreadPoolMergeExecutorService
+                .maybeCreateThreadPoolMergeExecutorService(
+                    testThreadPool,
+                    ClusterSettings.createBuiltInClusterSettings(settingsBuilder.build()),
+                    nodeEnvironment
+                )
+        ) {
+            assert threadPoolMergeExecutorService != null;
+            assertThat(threadPoolMergeExecutorService.getMaxConcurrentMerges(), greaterThanOrEqualTo(1));
+            final long availableBudget = aHasMoreSpace
+                ? aFileStore.usableSpace - diskSpaceLimitBytes
+                : bFileStore.usableSpace - diskSpaceLimitBytes;
+            final AtomicLong expectedAvailableBudget = new AtomicLong(availableBudget);
+            assertBusy(
+                () -> assertThat(threadPoolMergeExecutorService.getDiskSpaceAvailableForNewMergeTasks(), is(expectedAvailableBudget.get()))
+            );
+            List<ThreadPoolMergeScheduler.MergeTask> tasksRunList = new ArrayList<>();
+            List<ThreadPoolMergeScheduler.MergeTask> tasksAbortList = new ArrayList<>();
+            int submittedMergesCount = randomIntBetween(1, 5);
+            long[] mergeSizeEstimates = new long[submittedMergesCount];
+            for (int i = 0; i < submittedMergesCount; i++) {
+                // all these merge estimates are over-budget
+                mergeSizeEstimates[i] = availableBudget + randomLongBetween(1L, 10L);
+            }
+            for (int i = 0; i < submittedMergesCount;) {
+                ThreadPoolMergeScheduler.MergeTask mergeTask = mock(ThreadPoolMergeScheduler.MergeTask.class);
+                when(mergeTask.supportsIOThrottling()).thenReturn(randomBoolean());
+                doAnswer(mock -> {
+                    Schedule schedule = randomFrom(Schedule.values());
+                    if (schedule == BACKLOG) {
+                        testThreadPool.executor(ThreadPool.Names.GENERIC).execute(() -> {
+                            // re-enqueue backlogged merge task
+                            threadPoolMergeExecutorService.reEnqueueBackloggedMergeTask(mergeTask);
+                        });
+                    } else if (schedule == RUN) {
+                        tasksRunList.add(mergeTask);
+                    } else if (schedule == ABORT) {
+                        tasksAbortList.add(mergeTask);
+                    }
+                    return schedule;
+                }).when(mergeTask).schedule();
+                // randomly let some task complete
+                if (randomBoolean()) {
+                    // this task is not blocked
+                    when(mergeTask.estimatedRemainingMergeSize()).thenReturn(randomLongBetween(0L, availableBudget));
+                } else {
+                    // this task will initially be blocked because over-budget
+                    int finalI = i;
+                    doAnswer(mock -> mergeSizeEstimates[finalI]).when(mergeTask).estimatedRemainingMergeSize();
+                    i++;
+                }
+                assertTrue(threadPoolMergeExecutorService.submitMergeTask(mergeTask));
+            }
+            // assert tasks are blocked because their estimated merge size is over the available budget
+            assertBusy(() -> {
+                assertTrue(threadPoolMergeExecutorService.isMergingBlockedDueToInsufficientDiskSpace());
+                assertThat(threadPoolMergeExecutorService.getMergeTasksQueueLength(), is(submittedMergesCount));
+                assertThat(threadPoolMergeExecutorService.getDiskSpaceAvailableForNewMergeTasks(), is(availableBudget));
+                assertThat(threadPoolMergeExecutorService.getRunningMergeTasks().size(), is(0));
+            });
+            // change estimates to be under the available budget
+            for (int i = 0; i < submittedMergesCount; i++) {
+                mergeSizeEstimates[i] = randomLongBetween(0L, availableBudget);
+            }
+            // assert tasks are all unblocked because their estimated merge size is now under the available budget
+            assertBusy(() -> {
+                assertFalse(threadPoolMergeExecutorService.isMergingBlockedDueToInsufficientDiskSpace());
+                assertThat(threadPoolMergeExecutorService.getMergeTasksQueueLength(), is(0));
+                assertThat(threadPoolMergeExecutorService.getDiskSpaceAvailableForNewMergeTasks(), is(availableBudget));
+            });
+            // assert all merge tasks are either run or aborted
+            assertBusy(() -> {
+                for (ThreadPoolMergeScheduler.MergeTask mergeTask : tasksRunList) {
+                    verify(mergeTask, times(1)).run();
+                    verify(mergeTask, times(0)).abort();
+                }
+                for (ThreadPoolMergeScheduler.MergeTask mergeTask : tasksAbortList) {
+                    verify(mergeTask, times(0)).run();
+                    verify(mergeTask, times(1)).abort();
+                }
+            });
+        }
+        if (setThreadPoolMergeSchedulerSetting) {
+            assertWarnings(
+                "[indices.merge.scheduler.use_thread_pool] setting was deprecated in Elasticsearch "
+                    + "and will be removed in a future release. See the breaking changes documentation for the next major version."
+            );
+        }
+    }
+
     public void testMergeTasksAreUnblockedWhenMoreDiskSpaceBecomesAvailable() throws Exception {
         aFileStore.totalSpace = randomLongBetween(300L, 1_000L);
         bFileStore.totalSpace = randomLongBetween(300L, 1_000L);

+ 39 - 10
test/framework/src/main/java/org/elasticsearch/test/ESIntegTestCase.java

@@ -1755,7 +1755,7 @@ public abstract class ESIntegTestCase extends ESTestCase {
      * @param builders       the documents to index.
      */
     public void indexRandom(boolean forceRefresh, boolean dummyDocuments, List<IndexRequestBuilder> builders) {
-        indexRandom(forceRefresh, dummyDocuments, true, builders);
+        indexRandom(forceRefresh, dummyDocuments, true, true, builders);
     }
 
     /**
@@ -1765,13 +1765,37 @@ public abstract class ESIntegTestCase extends ESTestCase {
      * segment or if only one document is in a segment etc. This method prevents issues like this by randomizing the index
      * layout.
      *
-     * @param forceRefresh   if {@code true} all involved indices are refreshed once the documents are indexed.
-     * @param dummyDocuments if {@code true} some empty dummy documents may be randomly inserted into the document list and deleted once
-     *                       all documents are indexed. This is useful to produce deleted documents on the server side.
-     * @param maybeFlush     if {@code true} this method may randomly execute full flushes after index operations.
-     * @param builders       the documents to index.
+     * @param forceRefresh    if {@code true} all involved indices are refreshed once the documents are indexed.
+     * @param dummyDocuments  if {@code true} some empty dummy documents may be randomly inserted into the document list and deleted once
+     *                        all documents are indexed. This is useful to produce deleted documents on the server side.
+     * @param maybeFlush      if {@code true} this method may randomly execute full flushes after index operations.
+     * @param builders        the documents to index.
      */
     public void indexRandom(boolean forceRefresh, boolean dummyDocuments, boolean maybeFlush, List<IndexRequestBuilder> builders) {
+        indexRandom(forceRefresh, dummyDocuments, maybeFlush, true, builders);
+    }
+
+    /**
+     * Indexes the given {@link IndexRequestBuilder} instances randomly. It shuffles the given builders and either
+     * indexes them in a blocking or async fashion. This is very useful to catch problems that relate to internal document
+     * ids or index segment creations. Some features might have bug when a given document is the first or the last in a
+     * segment or if only one document is in a segment etc. This method prevents issues like this by randomizing the index
+     * layout.
+     *
+     * @param forceRefresh    if {@code true} all involved indices are refreshed once the documents are indexed.
+     * @param dummyDocuments  if {@code true} some empty dummy documents may be randomly inserted into the document list and deleted once
+     *                        all documents are indexed. This is useful to produce deleted documents on the server side.
+     * @param maybeFlush      if {@code true} this method may randomly execute full flushes after index operations.
+     * @param maybeForceMerge if {@code true} this method may randomly execute force merges after index operations.
+     * @param builders        the documents to index.
+     */
+    public void indexRandom(
+        boolean forceRefresh,
+        boolean dummyDocuments,
+        boolean maybeFlush,
+        boolean maybeForceMerge,
+        List<IndexRequestBuilder> builders
+    ) {
         Random random = random();
         Set<String> indices = new HashSet<>();
         builders = new ArrayList<>(builders);
@@ -1804,13 +1828,13 @@ public abstract class ESIntegTestCase extends ESTestCase {
                         new LatchedActionListener<DocWriteResponse>(ActionListener.noop(), newLatch(inFlightAsyncOperations))
                             .delegateResponse((l, e) -> fail(e))
                     );
-                    postIndexAsyncActions(indicesArray, inFlightAsyncOperations, maybeFlush);
+                    postIndexAsyncActions(indicesArray, inFlightAsyncOperations, maybeFlush, maybeForceMerge);
                 }
             } else {
                 logger.info("Index [{}] docs async: [{}] bulk: [{}]", builders.size(), false, false);
                 for (IndexRequestBuilder indexRequestBuilder : builders) {
                     indexRequestBuilder.get();
-                    postIndexAsyncActions(indicesArray, inFlightAsyncOperations, maybeFlush);
+                    postIndexAsyncActions(indicesArray, inFlightAsyncOperations, maybeFlush, maybeForceMerge);
                 }
             }
         } else {
@@ -1889,7 +1913,12 @@ public abstract class ESIntegTestCase extends ESTestCase {
     /**
      * Maybe refresh, force merge, or flush then always make sure there aren't too many in flight async operations.
      */
-    private void postIndexAsyncActions(String[] indices, List<CountDownLatch> inFlightAsyncOperations, boolean maybeFlush) {
+    private void postIndexAsyncActions(
+        String[] indices,
+        List<CountDownLatch> inFlightAsyncOperations,
+        boolean maybeFlush,
+        boolean maybeForceMerge
+    ) {
         if (rarely()) {
             if (rarely()) {
                 indicesAdmin().prepareRefresh(indices)
@@ -1899,7 +1928,7 @@ public abstract class ESIntegTestCase extends ESTestCase {
                 indicesAdmin().prepareFlush(indices)
                     .setIndicesOptions(IndicesOptions.lenientExpandOpen())
                     .execute(new LatchedActionListener<>(ActionListener.noop(), newLatch(inFlightAsyncOperations)));
-            } else if (rarely()) {
+            } else if (maybeForceMerge && rarely()) {
                 indicesAdmin().prepareForceMerge(indices)
                     .setIndicesOptions(IndicesOptions.lenientExpandOpen())
                     .setMaxNumSegments(between(1, 10))