Browse Source

Throttle indexing when disk IO throttling is disabled (#129245)

The threadpool-based merge scheduler triggers indexing throttling if merges are still getting enqueued faster than they're executed, while they are also disk IO unthrottled. This PR fixes the case where indexing throttling was incorrectly NOT triggered when disk IO throttling was disabled via the index settings.
Albert Zaharovits 4 months ago
parent
commit
7b8f4fb441

+ 5 - 0
docs/changelog/129245.yaml

@@ -0,0 +1,5 @@
+pr: 129245
+summary: Throttle indexing when disk IO throttling is disabled
+area: Engine
+type: enhancement
+issues: []

+ 2 - 2
server/src/main/java/org/elasticsearch/index/engine/ThreadPoolMergeScheduler.java

@@ -244,8 +244,8 @@ public class ThreadPoolMergeScheduler extends MergeScheduler implements Elastics
         // both currently running and enqueued merge tasks are considered "active" for throttling purposes
         int activeMerges = (int) (submittedMergesCount - doneMergesCount);
         if (activeMerges > configuredMaxMergeCount
-            // only throttle indexing if disk IO is un-throttled, and we still can't keep up with the merge load
-            && threadPoolMergeExecutorService.usingMaxTargetIORateBytesPerSec()
+            // only throttle indexing if disk IO is un-throttled (if enabled), and we still can't keep up with the merge load
+            && (config.isAutoThrottle() == false || threadPoolMergeExecutorService.usingMaxTargetIORateBytesPerSec())
             && shouldThrottleIncomingMerges.get() == false) {
             // maybe enable merge task throttling
             synchronized (shouldThrottleIncomingMerges) {

+ 21 - 7
server/src/test/java/org/elasticsearch/index/engine/ThreadPoolMergeSchedulerTests.java

@@ -196,7 +196,15 @@ public class ThreadPoolMergeSchedulerTests extends ESTestCase {
         }
     }
 
-    public void testIndexingThrottlingWhenSubmittingMerges() {
+    public void testIndexingThrottlingWhenSubmittingMergesWithDiskIOThrottlingEnabled() {
+        testIndexingThrottlingWhenSubmittingMerges(true);
+    }
+
+    public void testIndexingThrottlingWhenSubmittingMergesWithDiskIOThrottlingDisabled() {
+        testIndexingThrottlingWhenSubmittingMerges(false);
+    }
+
+    private void testIndexingThrottlingWhenSubmittingMerges(boolean withDiskIOThrottlingEnabled) {
         final int maxThreadCount = randomIntBetween(1, 5);
         // settings validation requires maxMergeCount >= maxThreadCount
         final int maxMergeCount = maxThreadCount + randomIntBetween(0, 5);
@@ -209,6 +217,7 @@ public class ThreadPoolMergeSchedulerTests extends ESTestCase {
         Settings mergeSchedulerSettings = Settings.builder()
             .put(MergeSchedulerConfig.MAX_THREAD_COUNT_SETTING.getKey(), maxThreadCount)
             .put(MergeSchedulerConfig.MAX_MERGE_COUNT_SETTING.getKey(), maxMergeCount)
+            .put(MergeSchedulerConfig.AUTO_THROTTLE_SETTING.getKey(), withDiskIOThrottlingEnabled)
             .build();
         TestThreadPoolMergeScheduler threadPoolMergeScheduler = new TestThreadPoolMergeScheduler(
             new ShardId("index", "_na_", 1),
@@ -224,12 +233,12 @@ public class ThreadPoolMergeSchedulerTests extends ESTestCase {
         while (submittedMerges < mergesToSubmit - 1) {
             isUsingMaxTargetIORate.set(randomBoolean());
             if (submittedMergeTasks.isEmpty() == false && randomBoolean()) {
-                // maybe schedule one submitted merge
+                // maybe schedule one of the submitted merges (but still it's not run)
                 MergeTask mergeTask = randomFrom(submittedMergeTasks);
                 submittedMergeTasks.remove(mergeTask);
                 mergeTask.schedule();
             } else {
-                // submit one merge
+                // submit one new merge
                 MergeSource mergeSource = mock(MergeSource.class);
                 OneMerge oneMerge = mock(OneMerge.class);
                 when(oneMerge.getStoreMergeInfo()).thenReturn(getNewMergeInfo(randomLongBetween(1L, 10L)));
@@ -237,7 +246,7 @@ public class ThreadPoolMergeSchedulerTests extends ESTestCase {
                 when(mergeSource.getNextMerge()).thenReturn(oneMerge, (OneMerge) null);
                 threadPoolMergeScheduler.merge(mergeSource, randomFrom(MergeTrigger.values()));
                 submittedMerges++;
-                if (isUsingMaxTargetIORate.get() && submittedMerges > maxMergeCount) {
+                if ((isUsingMaxTargetIORate.get() || withDiskIOThrottlingEnabled == false) && submittedMerges > maxMergeCount) {
                     expectIndexThrottling = true;
                 } else if (submittedMerges <= maxMergeCount) {
                     expectIndexThrottling = false;
@@ -246,15 +255,20 @@ public class ThreadPoolMergeSchedulerTests extends ESTestCase {
             // assert IO throttle state
             assertThat(threadPoolMergeScheduler.isIndexingThrottlingEnabled(), is(expectIndexThrottling));
         }
-        // submit one last merge when IO throttling is at max value
-        isUsingMaxTargetIORate.set(true);
+        if (withDiskIOThrottlingEnabled) {
+            // submit one last merge when IO throttling is at max value
+            isUsingMaxTargetIORate.set(true);
+        } else {
+            // but if disk IO throttling is not enabled, indexing throttling should still be triggered
+            isUsingMaxTargetIORate.set(randomBoolean());
+        }
         MergeSource mergeSource = mock(MergeSource.class);
         OneMerge oneMerge = mock(OneMerge.class);
         when(oneMerge.getStoreMergeInfo()).thenReturn(getNewMergeInfo(randomLongBetween(1L, 10L)));
         when(oneMerge.getMergeProgress()).thenReturn(new MergePolicy.OneMergeProgress());
         when(mergeSource.getNextMerge()).thenReturn(oneMerge, (OneMerge) null);
         threadPoolMergeScheduler.merge(mergeSource, randomFrom(MergeTrigger.values()));
-        // assert index throttling because IO throttling is at max value
+        // assert indexing throttling state because IO throttling is at max value OR disk IO throttling is disabled
         assertThat(threadPoolMergeScheduler.isIndexingThrottlingEnabled(), is(true));
     }