Browse Source

Send max of two types of max queue latency to ClusterInfo (#132675)

The TransportNodeUsageStatsForThreadPoolsAction now takes the max
latency of any task currently queued in the write thread pool queue AND
the previously collected max queue latency of any task dequeued since
the last call. This covers the possibility that queue times can rise
greatly before being reflected in execution: imagine all the write
threads are stalled or have long running tasks. This action feeds a max
queue latency stat to the ClusterInfo. Follow up from ES-12233.

Adds additional IT testing to exercise both forms of queue latency, a
followup for ES-12316.

-------------

Completing the follow up testing [Henning requested
previously](https://github.com/elastic/elasticsearch/pull/131480/files#r2243610807).
Dianna Hohensee 2 months ago
parent
commit
97a6dc8523

+ 5 - 0
docs/changelog/132675.yaml

@@ -0,0 +1,5 @@
+pr: 132675
+summary: Add second max queue latency stat to `ClusterInfo`
+area: Allocation
+type: enhancement
+issues: []

+ 178 - 6
server/src/internalClusterTest/java/org/elasticsearch/cluster/ClusterInfoServiceIT.java

@@ -27,6 +27,7 @@ import org.elasticsearch.cluster.routing.allocation.decider.EnableAllocationDeci
 import org.elasticsearch.cluster.service.ClusterService;
 import org.elasticsearch.common.Strings;
 import org.elasticsearch.common.settings.Settings;
+import org.elasticsearch.common.util.concurrent.TaskExecutionTimeTrackingEsThreadPoolExecutor;
 import org.elasticsearch.core.TimeValue;
 import org.elasticsearch.index.IndexService;
 import org.elasticsearch.index.shard.IndexShard;
@@ -59,6 +60,8 @@ import java.util.concurrent.atomic.AtomicBoolean;
 import static java.util.Collections.emptySet;
 import static java.util.Collections.singletonList;
 import static java.util.Collections.unmodifiableSet;
+import static org.elasticsearch.cluster.metadata.IndexMetadata.SETTING_NUMBER_OF_REPLICAS;
+import static org.elasticsearch.cluster.metadata.IndexMetadata.SETTING_NUMBER_OF_SHARDS;
 import static org.elasticsearch.common.util.set.Sets.newHashSet;
 import static org.elasticsearch.test.hamcrest.ElasticsearchAssertions.assertAcked;
 import static org.hamcrest.CoreMatchers.equalTo;
@@ -346,6 +349,8 @@ public class ClusterInfoServiceIT extends ESIntegTestCase {
                 WriteLoadConstraintSettings.WRITE_LOAD_DECIDER_ENABLED_SETTING.getKey(),
                 WriteLoadConstraintSettings.WriteLoadDeciderStatus.ENABLED
             )
+            // Manually control cluster info refreshes
+            .put(InternalClusterInfoService.INTERNAL_CLUSTER_INFO_UPDATE_INTERVAL_SETTING.getKey(), "60m")
             .build();
         var masterName = internalCluster().startMasterOnlyNode(settings);
         var dataNodeName = internalCluster().startDataOnlyNode(settings);
@@ -369,11 +374,8 @@ public class ClusterInfoServiceIT extends ESIntegTestCase {
             }
         );
 
-        // Do some writes to create some write thread pool activity.
-        final String indexName = randomIdentifier();
-        for (int i = 0; i < randomIntBetween(1, 1000); i++) {
-            index(indexName, Integer.toString(i), Collections.singletonMap("foo", "bar"));
-        }
+        // Generate some writes to get some non-zero write thread pool stats.
+        doALotOfDataNodeWrites();
 
         // Force a refresh of the ClusterInfo state to collect fresh info from the data nodes.
         final InternalClusterInfoService masterClusterInfoService = asInstanceOf(
@@ -387,7 +389,7 @@ public class ClusterInfoServiceIT extends ESIntegTestCase {
 
         final Map<String, NodeUsageStatsForThreadPools> usageStatsForThreadPools = clusterInfo.getNodeUsageStatsForThreadPools();
         logger.info("---> Thread pool usage stats reported by data nodes to the master: " + usageStatsForThreadPools);
-        assertThat(usageStatsForThreadPools.size(), equalTo(1)); // only stats from data nodes should be collectedg
+        assertThat(usageStatsForThreadPools.size(), equalTo(1)); // only stats from data nodes should be collected
         var dataNodeId = getNodeId(dataNodeName);
         var nodeUsageStatsForThreadPool = usageStatsForThreadPools.get(dataNodeId);
         assertNotNull(nodeUsageStatsForThreadPool);
@@ -400,4 +402,174 @@ public class ClusterInfoServiceIT extends ESIntegTestCase {
         assertThat(writeThreadPoolStats.averageThreadPoolUtilization(), greaterThan(0f));
         assertThat(writeThreadPoolStats.maxThreadPoolQueueLatencyMillis(), greaterThanOrEqualTo(0L));
     }
+
+    /**
+     * The {@link TransportNodeUsageStatsForThreadPoolsAction} returns the max value of two kinds of queue latencies:
+     * {@link TaskExecutionTimeTrackingEsThreadPoolExecutor#getMaxQueueLatencyMillisSinceLastPollAndReset()} and
+     * {@link TaskExecutionTimeTrackingEsThreadPoolExecutor#peekMaxQueueLatencyInQueueMillis()}. The latter looks at currently queued tasks,
+     * and the former tracks the queue latency of tasks when they are taken off of the queue to start execution.
+     */
+    public void testMaxQueueLatenciesInClusterInfo() throws Exception {
+        var settings = Settings.builder()
+            .put(
+                WriteLoadConstraintSettings.WRITE_LOAD_DECIDER_ENABLED_SETTING.getKey(),
+                WriteLoadConstraintSettings.WriteLoadDeciderStatus.ENABLED
+            )
+            // Manually control cluster info refreshes
+            .put(InternalClusterInfoService.INTERNAL_CLUSTER_INFO_UPDATE_INTERVAL_SETTING.getKey(), "60m")
+            .build();
+        var masterName = internalCluster().startMasterOnlyNode(settings);
+        var dataNodeName = internalCluster().startDataOnlyNode(settings);
+        ensureStableCluster(2);
+        assertEquals(internalCluster().getMasterName(), masterName);
+        assertNotEquals(internalCluster().getMasterName(), dataNodeName);
+        logger.info("---> master node: " + masterName + ", data node: " + dataNodeName);
+
+        // Block indexing on the data node by submitting write thread pool tasks equal to the number of write threads.
+        var barrier = blockDataNodeIndexing(dataNodeName);
+        try {
+            // Arbitrary number of tasks, which will queue because all the write threads are occupied already, greater than one: only
+            // strictly need a single task to occupy the queue.
+            int numberOfTasks = randomIntBetween(1, 5);
+            Thread[] threadsToJoin = new Thread[numberOfTasks];
+            String indexName = randomIdentifier();
+            createIndex(
+                indexName,
+                // NB: Set 0 replicas so that there aren't any stray GlobalCheckpointSyncAction tasks on the write thread pool.
+                Settings.builder().put(SETTING_NUMBER_OF_SHARDS, randomIntBetween(1, 5)).put(SETTING_NUMBER_OF_REPLICAS, 0).build()
+            );
+            for (int i = 0; i < numberOfTasks; ++i) {
+                threadsToJoin[i] = startParallelSingleWrite(indexName);
+            }
+
+            // Reach into the data node's write thread pool to check that tasks have reached the queue.
+            var dataNodeThreadPool = internalCluster().getInstance(ThreadPool.class, dataNodeName);
+            var writeExecutor = dataNodeThreadPool.executor(ThreadPool.Names.WRITE);
+            assert writeExecutor instanceof TaskExecutionTimeTrackingEsThreadPoolExecutor;
+            var trackingWriteExecutor = (TaskExecutionTimeTrackingEsThreadPoolExecutor) writeExecutor;
+            assertBusy(
+                // Wait for the parallel threads' writes to get queued in the write thread pool.
+                () -> assertThat(
+                    "Write thread pool dump: " + trackingWriteExecutor,
+                    trackingWriteExecutor.peekMaxQueueLatencyInQueueMillis(),
+                    greaterThan(0L)
+                )
+            );
+
+            // Force a refresh of the ClusterInfo state to collect fresh info from the data node.
+            final InternalClusterInfoService masterClusterInfoService = asInstanceOf(
+                InternalClusterInfoService.class,
+                internalCluster().getCurrentMasterNodeInstance(ClusterInfoService.class)
+            );
+            final ClusterInfo clusterInfo = ClusterInfoServiceUtils.refresh(masterClusterInfoService);
+
+            // Since tasks are actively queued right now, #peekMaxQueueLatencyInQueue, which is called from the
+            // TransportNodeUsageStatsForThreadPoolsAction that a ClusterInfoService refresh initiates, should return a max queue
+            // latency > 0;
+            {
+                final Map<String, NodeUsageStatsForThreadPools> usageStatsForThreadPools = clusterInfo.getNodeUsageStatsForThreadPools();
+                logger.info("---> Thread pool usage stats reported by data nodes to the master: " + usageStatsForThreadPools);
+                assertThat(usageStatsForThreadPools.size(), equalTo(1)); // only stats from data node should be collected
+                var dataNodeId = getNodeId(dataNodeName);
+                var nodeUsageStatsForThreadPool = usageStatsForThreadPools.get(dataNodeId);
+                assertNotNull(nodeUsageStatsForThreadPool);
+                logger.info("---> Data node's thread pool stats: " + nodeUsageStatsForThreadPool);
+
+                assertEquals(dataNodeId, nodeUsageStatsForThreadPool.nodeId());
+                var writeThreadPoolStats = nodeUsageStatsForThreadPool.threadPoolUsageStatsMap().get(ThreadPool.Names.WRITE);
+                assertNotNull("Expected to find stats for the WRITE thread pool", writeThreadPoolStats);
+                assertThat(writeThreadPoolStats.totalThreadPoolThreads(), greaterThan(0));
+                assertThat(writeThreadPoolStats.averageThreadPoolUtilization(), greaterThanOrEqualTo(0f));
+                assertThat(writeThreadPoolStats.maxThreadPoolQueueLatencyMillis(), greaterThan(0L));
+            }
+
+            // Now release the data node's indexing, and drain the queued tasks. Max queue latency of executed (not queued) tasks is reset
+            // by each TransportNodeUsageStatsForThreadPoolsAction call (#getMaxQueueLatencyMillisSinceLastPollAndReset), so the new queue
+            // latencies will be present in the next call. There will be nothing in the queue to peek at now, so the result of the max
+            // queue latency result in TransportNodeUsageStatsForThreadPoolsAction will reflect
+            // #getMaxQueueLatencyMillisSinceLastPollAndReset and not #peekMaxQueueLatencyInQueue.
+            barrier.await();
+            for (int i = 0; i < numberOfTasks; ++i) {
+                threadsToJoin[i].join();
+            }
+            assertThat(
+                "Unexpectedly found a task queued for the write thread pool. Write thread pool dump: " + trackingWriteExecutor,
+                trackingWriteExecutor.peekMaxQueueLatencyInQueueMillis(),
+                equalTo(0L)
+            );
+
+            final ClusterInfo nextClusterInfo = ClusterInfoServiceUtils.refresh(masterClusterInfoService);
+            {
+                final Map<String, NodeUsageStatsForThreadPools> usageStatsForThreadPools = nextClusterInfo
+                    .getNodeUsageStatsForThreadPools();
+                logger.info("---> Thread pool usage stats reported by data nodes to the master: " + usageStatsForThreadPools);
+                assertThat(usageStatsForThreadPools.size(), equalTo(1)); // only stats from data nodes should be collected
+                var dataNodeId = getNodeId(dataNodeName);
+                var nodeUsageStatsForThreadPool = usageStatsForThreadPools.get(dataNodeId);
+                assertNotNull(nodeUsageStatsForThreadPool);
+                logger.info("---> Data node's thread pool stats: " + nodeUsageStatsForThreadPool);
+
+                assertEquals(dataNodeId, nodeUsageStatsForThreadPool.nodeId());
+                var writeThreadPoolStats = nodeUsageStatsForThreadPool.threadPoolUsageStatsMap().get(ThreadPool.Names.WRITE);
+                assertNotNull("Expected to find stats for the WRITE thread pool", writeThreadPoolStats);
+                assertThat(writeThreadPoolStats.totalThreadPoolThreads(), greaterThan(0));
+                assertThat(writeThreadPoolStats.averageThreadPoolUtilization(), greaterThan(0f));
+                assertThat(writeThreadPoolStats.maxThreadPoolQueueLatencyMillis(), greaterThanOrEqualTo(0L));
+            }
+        } finally {
+            // Ensure that the write threads have been released by signalling an interrupt on any callers waiting on the barrier. If the
+            // callers have already all been successfully released, then there will be nothing left to interrupt.
+            logger.info("---> Ensuring release of the barrier on write thread pool tasks");
+            barrier.reset();
+        }
+
+        // Now that there's nothing in the queue, and no activity since the last ClusterInfo refresh, the max latency returned should be
+        // zero. Verify this.
+        final InternalClusterInfoService masterClusterInfoService = asInstanceOf(
+            InternalClusterInfoService.class,
+            internalCluster().getCurrentMasterNodeInstance(ClusterInfoService.class)
+        );
+        final ClusterInfo clusterInfo = ClusterInfoServiceUtils.refresh(masterClusterInfoService);
+        {
+            final Map<String, NodeUsageStatsForThreadPools> usageStatsForThreadPools = clusterInfo.getNodeUsageStatsForThreadPools();
+            logger.info("---> Thread pool usage stats reported by data nodes to the master: " + usageStatsForThreadPools);
+            assertThat(usageStatsForThreadPools.size(), equalTo(1)); // only stats from data nodes should be collected
+            var dataNodeId = getNodeId(dataNodeName);
+            var nodeUsageStatsForThreadPool = usageStatsForThreadPools.get(dataNodeId);
+            assertNotNull(nodeUsageStatsForThreadPool);
+            logger.info("---> Data node's thread pool stats: " + nodeUsageStatsForThreadPool);
+
+            assertEquals(dataNodeId, nodeUsageStatsForThreadPool.nodeId());
+            var writeThreadPoolStats = nodeUsageStatsForThreadPool.threadPoolUsageStatsMap().get(ThreadPool.Names.WRITE);
+            assertNotNull("Expected to find stats for the WRITE thread pool", writeThreadPoolStats);
+            assertThat(writeThreadPoolStats.totalThreadPoolThreads(), greaterThan(0));
+            assertThat(writeThreadPoolStats.averageThreadPoolUtilization(), equalTo(0f));
+            assertThat(writeThreadPoolStats.maxThreadPoolQueueLatencyMillis(), equalTo(0L));
+        }
+    }
+
+    /**
+     * Do some writes to create some write thread pool activity.
+     */
+    private void doALotOfDataNodeWrites() {
+        final String indexName = randomIdentifier();
+        final int randomInt = randomIntBetween(500, 1000);
+        for (int i = 0; i < randomInt; i++) {
+            index(indexName, Integer.toString(i), Collections.singletonMap("foo", "bar"));
+        }
+    }
+
+    /**
+     * Starts a single index request on a parallel thread and returns the reference so {@link Thread#join()} can be called eventually.
+     */
+    private Thread startParallelSingleWrite(String indexName) {
+        Thread running = new Thread(() -> doSingleWrite(indexName));
+        running.start();
+        return running;
+    }
+
+    private void doSingleWrite(String indexName) {
+        final int randomId = randomIntBetween(500, 1000);
+        index(indexName, Integer.toString(randomId), Collections.singletonMap("foo", "bar"));
+    }
 }

+ 4 - 1
server/src/main/java/org/elasticsearch/action/admin/cluster/node/usage/TransportNodeUsageStatsForThreadPoolsAction.java

@@ -104,7 +104,10 @@ public class TransportNodeUsageStatsForThreadPoolsAction extends TransportNodesA
             (float) trackingForWriteExecutor.pollUtilization(
                 TaskExecutionTimeTrackingEsThreadPoolExecutor.UtilizationTrackingPurpose.ALLOCATION
             ),
-            trackingForWriteExecutor.getMaxQueueLatencyMillisSinceLastPollAndReset()
+            Math.max(
+                trackingForWriteExecutor.getMaxQueueLatencyMillisSinceLastPollAndReset(),
+                trackingForWriteExecutor.peekMaxQueueLatencyInQueueMillis()
+            )
         );
 
         Map<String, ThreadPoolUsageStats> perThreadPool = new HashMap<>();

+ 14 - 7
server/src/main/java/org/elasticsearch/common/util/concurrent/TaskExecutionTimeTrackingEsThreadPoolExecutor.java

@@ -151,7 +151,8 @@ public final class TaskExecutionTimeTrackingEsThreadPoolExecutor extends EsThrea
      * Returns the max queue latency seen since the last time that this method was called. Every call will reset the max seen back to zero.
      * Latencies are only observed as tasks are taken off of the queue. This means that tasks in the queue will not contribute to the max
      * latency until they are unqueued and handed to a thread to execute. To see the latency of tasks still in the queue, use
-     * {@link #peekMaxQueueLatencyInQueue}. If there have been no tasks in the queue since the last call, then zero latency is returned.
+     * {@link #peekMaxQueueLatencyInQueueMillis}. If there have been no tasks in the queue since the last call, then zero latency is
+     * returned.
      */
     public long getMaxQueueLatencyMillisSinceLastPollAndReset() {
         if (trackMaxQueueLatency == false) {
@@ -164,23 +165,29 @@ public final class TaskExecutionTimeTrackingEsThreadPoolExecutor extends EsThrea
      * Returns the queue latency of the next task to be executed that is still in the task queue. Essentially peeks at the front of the
      * queue and calculates how long it has been there. Returns zero if there is no queue.
      */
-    public long peekMaxQueueLatencyInQueue() {
+    public long peekMaxQueueLatencyInQueueMillis() {
         if (trackMaxQueueLatency == false) {
             return 0;
         }
+
         var queue = getQueue();
-        if (queue.isEmpty()) {
+        assert queue instanceof LinkedTransferQueue || queue instanceof SizeBlockingQueue
+            : "Not the type of queue expected: " + queue.getClass();
+        var linkedTransferOrSizeBlockingQueue = queue instanceof LinkedTransferQueue
+            ? (LinkedTransferQueue) queue
+            : (SizeBlockingQueue) queue;
+
+        var task = linkedTransferOrSizeBlockingQueue.peek();
+        if (task == null) {
+            // There's nothing in the queue right now.
             return 0;
         }
-        assert queue instanceof LinkedTransferQueue : "Not the type of queue expected: " + queue.getClass();
-        var linkedTransferQueue = (LinkedTransferQueue) queue;
 
-        var task = linkedTransferQueue.peek();
         assert task instanceof WrappedRunnable : "Not the type of task expected: " + task.getClass();
         var wrappedTask = ((WrappedRunnable) task).unwrap();
         assert wrappedTask instanceof TimedRunnable : "Not the type of task expected: " + task.getClass();
         var timedTask = (TimedRunnable) wrappedTask;
-        return timedTask.getTimeSinceCreationNanos();
+        return TimeUnit.NANOSECONDS.toMillis(timedTask.getTimeSinceCreationNanos());
     }
 
     /**

+ 17 - 0
server/src/main/java/org/elasticsearch/threadpool/ThreadPool.java

@@ -1024,6 +1024,23 @@ public class ThreadPool implements ReportingService<ThreadPoolInfo>, Scheduler,
             return builder;
         }
 
+        @Override
+        public String toString() {
+            return "Info[name="
+                + name
+                + ",type="
+                + type
+                + ",min="
+                + min
+                + ",max="
+                + max
+                + ",keepAlive="
+                + keepAlive
+                + ",queueSize="
+                + queueSize
+                + "]";
+        }
+
     }
 
     /**

+ 7 - 7
server/src/test/java/org/elasticsearch/common/util/concurrent/TaskExecutionTimeTrackingEsThreadPoolExecutorTests.java

@@ -95,7 +95,7 @@ public class TaskExecutionTimeTrackingEsThreadPoolExecutorTests extends ESTestCa
 
     /**
      * Verifies that we can peek at the task in front of the task queue to fetch the duration that the oldest task has been queued.
-     * Tests {@link TaskExecutionTimeTrackingEsThreadPoolExecutor#peekMaxQueueLatencyInQueue}.
+     * Tests {@link TaskExecutionTimeTrackingEsThreadPoolExecutor#peekMaxQueueLatencyInQueueMillis}.
      */
     public void testFrontOfQueueLatency() throws Exception {
         ThreadContext context = new ThreadContext(Settings.EMPTY);
@@ -135,7 +135,7 @@ public class TaskExecutionTimeTrackingEsThreadPoolExecutorTests extends ESTestCa
             logger.info("--> executor: {}", executor);
 
             // Check that the peeking at a non-existence queue returns zero.
-            assertEquals("Zero should be returned when there is no queue", 0, executor.peekMaxQueueLatencyInQueue());
+            assertEquals("Zero should be returned when there is no queue", 0, executor.peekMaxQueueLatencyInQueueMillis());
 
             // Submit two tasks, into the thread pool with a single worker thread. The second one will be queued (because the pool only has
             // one thread) and can be peeked at.
@@ -143,10 +143,10 @@ public class TaskExecutionTimeTrackingEsThreadPoolExecutorTests extends ESTestCa
             executor.execute(() -> {});
 
             waitForTimeToElapse();
-            var frontOfQueueDuration = executor.peekMaxQueueLatencyInQueue();
+            var frontOfQueueDuration = executor.peekMaxQueueLatencyInQueueMillis();
             assertThat("Expected a task to be queued", frontOfQueueDuration, greaterThan(0L));
             waitForTimeToElapse();
-            var updatedFrontOfQueueDuration = executor.peekMaxQueueLatencyInQueue();
+            var updatedFrontOfQueueDuration = executor.peekMaxQueueLatencyInQueueMillis();
             assertThat(
                 "Expected a second peek to report a longer duration",
                 updatedFrontOfQueueDuration,
@@ -156,7 +156,7 @@ public class TaskExecutionTimeTrackingEsThreadPoolExecutorTests extends ESTestCa
             // Release the first task that's running, and wait for the second to start -- then it is ensured that the queue will be empty.
             safeAwait(barrier);
             safeAwait(barrier);
-            assertEquals("Queue should be emptied", 0, executor.peekMaxQueueLatencyInQueue());
+            assertEquals("Queue should be emptied", 0, executor.peekMaxQueueLatencyInQueueMillis());
         } finally {
             ThreadPool.terminate(executor, 10, TimeUnit.SECONDS);
         }
@@ -463,8 +463,8 @@ public class TaskExecutionTimeTrackingEsThreadPoolExecutorTests extends ESTestCa
      */
     private static void waitForTimeToElapse() throws InterruptedException {
         final var startNanoTime = System.nanoTime();
-        while ((System.nanoTime() - startNanoTime) < 1) {
-            Thread.sleep(Duration.ofNanos(1));
+        while (TimeUnit.MILLISECONDS.convert(System.nanoTime() - startNanoTime, TimeUnit.NANOSECONDS) < 1) {
+            Thread.sleep(Duration.ofMillis(1));
         }
     }
 }

+ 39 - 0
test/framework/src/main/java/org/elasticsearch/test/ESIntegTestCase.java

@@ -163,6 +163,7 @@ import org.elasticsearch.test.disruption.NetworkDisruption;
 import org.elasticsearch.test.disruption.ServiceDisruptionScheme;
 import org.elasticsearch.test.store.MockFSIndexStore;
 import org.elasticsearch.test.transport.MockTransportService;
+import org.elasticsearch.threadpool.ThreadPool;
 import org.elasticsearch.transport.TransportInterceptor;
 import org.elasticsearch.transport.TransportRequest;
 import org.elasticsearch.transport.TransportRequestHandler;
@@ -204,11 +205,14 @@ import java.util.Map;
 import java.util.Optional;
 import java.util.Random;
 import java.util.Set;
+import java.util.concurrent.BrokenBarrierException;
 import java.util.concurrent.Callable;
 import java.util.concurrent.CountDownLatch;
+import java.util.concurrent.CyclicBarrier;
 import java.util.concurrent.ExecutionException;
 import java.util.concurrent.Executor;
 import java.util.concurrent.TimeUnit;
+import java.util.concurrent.TimeoutException;
 import java.util.concurrent.atomic.AtomicInteger;
 import java.util.concurrent.atomic.AtomicReference;
 import java.util.function.Function;
@@ -2915,4 +2919,39 @@ public abstract class ESIntegTestCase extends ESTestCase {
             )
         );
     }
+
+    /**
+     * Submits as many tasks to the given data node's write thread pool as there are write threads. These tasks will wait on the barrier
+     * that is returned, which waits for total-write-threads + 1 callers. The caller can release the tasks by calling
+     * {@code barrier.await()} or interrupt them with {@code barrier.reset()}.
+     */
+    public CyclicBarrier blockDataNodeIndexing(String dataNodeName) {
+        // Block the executor workers to simulate long-running write tasks
+        var threadpool = internalCluster().getInstance(ThreadPool.class, dataNodeName);
+        var executor = threadpool.executor(ThreadPool.Names.WRITE);
+        final var executorInfo = threadpool.info(ThreadPool.Names.WRITE);
+        final var executorThreads = executorInfo.getMax();
+        var barrier = new CyclicBarrier(executorThreads + 1);
+        for (int i = 0; i < executorThreads; i++) {
+            executor.execute(() -> longAwait(barrier));
+        }
+        logger.info(
+            "---> Submitted ["
+                + executorThreads
+                + "] tasks to the write thread pool that will wait on a barrier until released. Write thread pool info: "
+                + executorInfo
+        );
+        return barrier;
+    }
+
+    private static void longAwait(CyclicBarrier barrier) {
+        try {
+            barrier.await(30, TimeUnit.SECONDS);
+        } catch (BrokenBarrierException | TimeoutException e) {
+            throw new AssertionError(e);
+        } catch (InterruptedException e) {
+            Thread.currentThread().interrupt();
+            throw new AssertionError(e);
+        }
+    }
 }