Browse Source

Increase force_merge threadpool size (#87082)

Changes the default size used for the force_merge threadpool to 1/8 of
the allocated processors, with a minimum value of 1.

Closes #84943
Pooya Salehi 3 years ago
parent
commit
beadcaf631

+ 6 - 0
docs/changelog/87082.yaml

@@ -0,0 +1,6 @@
+pr: 87082
+summary: Increase `force_merge` threadpool size based on the allocated processors
+area: Engine
+type: enhancement
+issues:
+ - 84943

+ 2 - 1
docs/reference/modules/threadpool.asciidoc

@@ -79,7 +79,8 @@ There are several thread pools, but the important ones include:
 
 `force_merge`::
     For <<indices-forcemerge,force merge>> operations.
-    Thread pool type is `fixed` with a size of 1 and an unbounded queue size.
+    Thread pool type is `fixed` with a size of `max(1, (`<<node.processors,
+`# of allocated processors`>>`) / 8)` and an unbounded queue size.
 
 `management`::
     For cluster management.

+ 8 - 1
server/src/main/java/org/elasticsearch/threadpool/ThreadPool.java

@@ -219,7 +219,10 @@ public class ThreadPool implements ReportingService<ThreadPoolInfo>, Scheduler {
             Names.FETCH_SHARD_STARTED,
             new ScalingExecutorBuilder(Names.FETCH_SHARD_STARTED, 1, 2 * allocatedProcessors, TimeValue.timeValueMinutes(5), false)
         );
-        builders.put(Names.FORCE_MERGE, new FixedExecutorBuilder(settings, Names.FORCE_MERGE, 1, -1, false));
+        builders.put(
+            Names.FORCE_MERGE,
+            new FixedExecutorBuilder(settings, Names.FORCE_MERGE, oneEighthAllocatedProcessors(allocatedProcessors), -1, false)
+        );
         builders.put(Names.CLUSTER_COORDINATION, new FixedExecutorBuilder(settings, Names.CLUSTER_COORDINATION, 1, -1, false));
         builders.put(
             Names.FETCH_SHARD_STORE,
@@ -551,6 +554,10 @@ public class ThreadPool implements ReportingService<ThreadPoolInfo>, Scheduler {
         return boundedBy(2 * allocatedProcessors, 2, Integer.MAX_VALUE);
     }
 
+    static int oneEighthAllocatedProcessors(final int allocatedProcessors) {
+        return boundedBy(allocatedProcessors / 8, 1, Integer.MAX_VALUE);
+    }
+
     public static int searchThreadPoolSize(final int allocatedProcessors) {
         return ((allocatedProcessors * 3) / 2) + 1;
     }

+ 24 - 0
server/src/test/java/org/elasticsearch/threadpool/ThreadPoolTests.java

@@ -51,6 +51,13 @@ public class ThreadPoolTests extends ESTestCase {
         assertThat(ThreadPool.boundedBy(value, min, max), equalTo(value));
     }
 
+    public void testOneEighthAllocatedProcessors() {
+        assertThat(ThreadPool.oneEighthAllocatedProcessors(1), equalTo(1));
+        assertThat(ThreadPool.oneEighthAllocatedProcessors(4), equalTo(1));
+        assertThat(ThreadPool.oneEighthAllocatedProcessors(8), equalTo(1));
+        assertThat(ThreadPool.oneEighthAllocatedProcessors(32), equalTo(4));
+    }
+
     public void testAbsoluteTime() throws Exception {
         TestThreadPool threadPool = new TestThreadPool("test");
         try {
@@ -304,4 +311,21 @@ public class ThreadPoolTests extends ESTestCase {
             assertTrue(terminate(threadPool));
         }
     }
+
+    public void testForceMergeThreadPoolSize() {
+        final int allocatedProcessors = randomIntBetween(1, EsExecutors.allocatedProcessors(Settings.EMPTY));
+        final ThreadPool threadPool = new TestThreadPool(
+            "test",
+            Settings.builder().put(EsExecutors.NODE_PROCESSORS_SETTING.getKey(), allocatedProcessors).build()
+        );
+        try {
+            final int expectedSize = Math.max(1, allocatedProcessors / 8);
+            ThreadPool.Info info = threadPool.info(ThreadPool.Names.FORCE_MERGE);
+            assertThat(info.getThreadPoolType(), equalTo(ThreadPool.ThreadPoolType.FIXED));
+            assertThat(info.getMin(), equalTo(expectedSize));
+            assertThat(info.getMax(), equalTo(expectedSize));
+        } finally {
+            assertTrue(terminate(threadPool));
+        }
+    }
 }