Browse Source

Reduce size of MANAGEMENT threadpool on small node (#71171)

Today by default the `MANAGEMENT` threadpool always permits 5 threads
even if the node has a single CPU, which unfairly prioritises management
activities on small nodes. With this commit we limit the size of this
threadpool to the number of processors if less than 5.

Relates #70435
David Turner 4 years ago
parent
commit
b690798348

+ 10 - 14
modules/reindex/src/test/java/org/elasticsearch/index/reindex/DeleteByQueryBasicTests.java

@@ -8,6 +8,7 @@
 
 package org.elasticsearch.index.reindex;
 
+import org.elasticsearch.action.ActionFuture;
 import org.elasticsearch.action.admin.indices.alias.Alias;
 import org.elasticsearch.action.index.IndexRequestBuilder;
 import org.elasticsearch.cluster.ClusterInfoService;
@@ -22,8 +23,6 @@ import org.elasticsearch.index.query.QueryBuilders;
 import org.elasticsearch.plugins.Plugin;
 import org.elasticsearch.search.sort.SortOrder;
 import org.elasticsearch.test.InternalSettingsPlugin;
-import org.elasticsearch.test.InternalTestCluster;
-import org.elasticsearch.threadpool.ThreadPool;
 
 import java.util.ArrayList;
 import java.util.Collection;
@@ -242,18 +241,15 @@ public class DeleteByQueryBasicTests extends ReindexTestCase {
             // it will trigger a retry policy in the delete by query request because the rest status of the block is 429
             enableIndexBlock("test", SETTING_READ_ONLY_ALLOW_DELETE);
             if (diskAllocationDeciderEnabled) {
-                InternalTestCluster internalTestCluster = internalCluster();
-                InternalClusterInfoService infoService = (InternalClusterInfoService) internalTestCluster
-                    .getInstance(ClusterInfoService.class, internalTestCluster.getMasterName());
-                ThreadPool threadPool = internalTestCluster.getInstance(ThreadPool.class, internalTestCluster.getMasterName());
-                // Refresh the cluster info after a random delay to check the disk threshold and release the block on the index
-                threadPool.schedule(
-                        () -> ClusterInfoServiceUtils.refresh(infoService),
-                        TimeValue.timeValueMillis(randomIntBetween(1, 100)),
-                        ThreadPool.Names.MANAGEMENT);
-                // The delete by query request will be executed successfully because the block will be released
-                assertThat(deleteByQuery().source("test").filter(QueryBuilders.matchAllQuery()).refresh(true).get(),
-                    matcher().deleted(docs));
+                // Fire off the delete-by-query first
+                final ActionFuture<BulkByScrollResponse> deleteByQueryResponse
+                    = deleteByQuery().source("test").filter(QueryBuilders.matchAllQuery()).refresh(true).execute();
+                // Then refresh the cluster info which checks the disk threshold and releases the block on the index
+                final InternalClusterInfoService clusterInfoService
+                        = (InternalClusterInfoService) internalCluster().getCurrentMasterNodeInstance(ClusterInfoService.class);
+                ClusterInfoServiceUtils.refresh(clusterInfoService);
+                // The delete by query request will be executed successfully because it retries after the block is released
+                assertThat(deleteByQueryResponse.actionGet(), matcher().deleted(docs));
             } else {
                 // The delete by query request will not be executed successfully because the block cannot be released
                 assertThat(deleteByQuery().source("test").filter(QueryBuilders.matchAllQuery()).refresh(true)

+ 1 - 6
qa/smoke-test-http/src/test/java/org/elasticsearch/http/BlockedSearcherRestCancellationTestCase.java

@@ -85,7 +85,6 @@ public abstract class BlockedSearcherRestCancellationTestCase extends HttpSmokeT
             }
         }
         assertThat(searcherBlocks, not(empty()));
-
         final List<Releasable> releasables = new ArrayList<>();
         try {
             for (final Semaphore searcherBlock : searcherBlocks) {
@@ -107,11 +106,7 @@ public abstract class BlockedSearcherRestCancellationTestCase extends HttpSmokeT
                 }
             });
 
-            logger.info("--> waiting for task to start");
-            assertBusy(() -> {
-                final List<TaskInfo> tasks = client().admin().cluster().prepareListTasks().get().getTasks();
-                assertTrue(tasks.toString(), tasks.stream().anyMatch(t -> t.getAction().startsWith(actionPrefix)));
-            });
+            awaitTaskWithPrefix(actionPrefix);
 
             logger.info("--> waiting for at least one task to hit a block");
             assertBusy(() -> assertTrue(searcherBlocks.stream().anyMatch(Semaphore::hasQueuedThreads)));

+ 1 - 5
qa/smoke-test-http/src/test/java/org/elasticsearch/http/ClusterStateRestCancellationIT.java

@@ -84,11 +84,7 @@ public class ClusterStateRestCancellationIT extends HttpSmokeTestCase {
             }
         });
 
-        logger.info("--> waiting for task to start");
-        assertBusy(() -> {
-            final List<TaskInfo> tasks = client().admin().cluster().prepareListTasks().get().getTasks();
-            assertTrue(tasks.toString(), tasks.stream().anyMatch(t -> t.getAction().equals(ClusterStateAction.NAME)));
-        });
+        awaitTaskWithPrefix(ClusterStateAction.NAME);
 
         logger.info("--> cancelling cluster state request");
         cancellable.cancel();

+ 1 - 5
qa/smoke-test-http/src/test/java/org/elasticsearch/http/ClusterStatsRestCancellationIT.java

@@ -114,11 +114,7 @@ public class ClusterStatsRestCancellationIT extends HttpSmokeTestCase {
                 }
             });
 
-            logger.info("--> waiting for task to start");
-            assertBusy(() -> {
-                final List<TaskInfo> tasks = client().admin().cluster().prepareListTasks().get().getTasks();
-                assertTrue(tasks.toString(), tasks.stream().anyMatch(t -> t.getAction().startsWith(ClusterStatsAction.NAME)));
-            });
+            awaitTaskWithPrefix(ClusterStatsAction.NAME);
 
             logger.info("--> waiting for at least one task to hit a block");
             assertBusy(() -> assertTrue(statsBlocks.stream().anyMatch(Semaphore::hasQueuedThreads)));

+ 12 - 0
qa/smoke-test-http/src/test/java/org/elasticsearch/http/HttpSmokeTestCase.java

@@ -12,6 +12,7 @@ import org.elasticsearch.common.settings.Settings;
 import org.elasticsearch.plugins.Plugin;
 import org.elasticsearch.test.ESIntegTestCase;
 import org.elasticsearch.transport.Netty4Plugin;
+import org.elasticsearch.transport.TransportService;
 import org.elasticsearch.transport.nio.MockNioTransportPlugin;
 import org.elasticsearch.transport.nio.NioTransportPlugin;
 import org.junit.BeforeClass;
@@ -76,4 +77,15 @@ public abstract class HttpSmokeTestCase extends ESIntegTestCase {
         return true;
     }
 
+    protected void awaitTaskWithPrefix(String actionPrefix) throws Exception {
+        logger.info("--> waiting for task with prefix [{}] to start", actionPrefix);
+        assertBusy(() -> {
+            for (TransportService transportService : internalCluster().getInstances(TransportService.class)) {
+                if (transportService.getTaskManager().getTasks().values().stream().anyMatch(t -> t.getAction().startsWith(actionPrefix))) {
+                    return;
+                }
+            }
+            fail("no task with prefix [" + actionPrefix + "] found");
+        });
+    }
 }

+ 1 - 5
qa/smoke-test-http/src/test/java/org/elasticsearch/http/IndicesRecoveryRestCancellationIT.java

@@ -82,11 +82,7 @@ public class IndicesRecoveryRestCancellationIT extends HttpSmokeTestCase {
                 }
             });
 
-            logger.info("--> waiting for task to start");
-            assertBusy(() -> {
-                final List<TaskInfo> tasks = client().admin().cluster().prepareListTasks().get().getTasks();
-                assertTrue(tasks.toString(), tasks.stream().anyMatch(t -> t.getAction().startsWith(RecoveryAction.NAME)));
-            });
+            awaitTaskWithPrefix(RecoveryAction.NAME);
 
             logger.info("--> waiting for at least one task to hit a block");
             assertBusy(() -> assertTrue(operationBlocks.stream().anyMatch(Semaphore::hasQueuedThreads)));

+ 11 - 2
server/src/main/java/org/elasticsearch/action/admin/cluster/node/tasks/cancel/TransportCancelTasksAction.java

@@ -35,8 +35,17 @@ public class TransportCancelTasksAction extends TransportTasksAction<Cancellable
 
     @Inject
     public TransportCancelTasksAction(ClusterService clusterService, TransportService transportService, ActionFilters actionFilters) {
-        super(CancelTasksAction.NAME, clusterService, transportService, actionFilters,
-            CancelTasksRequest::new, CancelTasksResponse::new, TaskInfo::new, ThreadPool.Names.MANAGEMENT);
+        super(
+                CancelTasksAction.NAME,
+                clusterService,
+                transportService,
+                actionFilters,
+                CancelTasksRequest::new,
+                CancelTasksResponse::new,
+                TaskInfo::new,
+                // Cancellation is usually lightweight, and runs on the transport thread if the task didn't even start yet, but some
+                // implementations of CancellableTask#onCancelled() are nontrivial so we use GENERIC here. TODO could it be SAME?
+                ThreadPool.Names.GENERIC);
     }
 
     @Override

+ 2 - 1
server/src/main/java/org/elasticsearch/threadpool/ThreadPool.java

@@ -161,7 +161,8 @@ public class ThreadPool implements ReportingService<ThreadPoolInfo>, Scheduler {
         builders.put(Names.ANALYZE, new FixedExecutorBuilder(settings, Names.ANALYZE, 1, 16, false));
         builders.put(Names.SEARCH, new FixedExecutorBuilder(settings, Names.SEARCH, searchThreadPoolSize(allocatedProcessors), 1000, true));
         builders.put(Names.SEARCH_THROTTLED, new FixedExecutorBuilder(settings, Names.SEARCH_THROTTLED, 1, 100, true));
-        builders.put(Names.MANAGEMENT, new ScalingExecutorBuilder(Names.MANAGEMENT, 1, 5, TimeValue.timeValueMinutes(5)));
+        builders.put(Names.MANAGEMENT,
+                new ScalingExecutorBuilder(Names.MANAGEMENT, 1, boundedBy(allocatedProcessors, 1, 5), TimeValue.timeValueMinutes(5)));
         builders.put(Names.FLUSH, new ScalingExecutorBuilder(Names.FLUSH, 1, halfProcMaxAt5, TimeValue.timeValueMinutes(5)));
         builders.put(Names.REFRESH, new ScalingExecutorBuilder(Names.REFRESH, 1, halfProcMaxAt10, TimeValue.timeValueMinutes(5)));
         builders.put(Names.WARMER, new ScalingExecutorBuilder(Names.WARMER, 1, halfProcMaxAt5, TimeValue.timeValueMinutes(5)));

+ 1 - 1
server/src/test/java/org/elasticsearch/threadpool/ScalingThreadPoolTests.java

@@ -91,7 +91,7 @@ public class ScalingThreadPoolTests extends ESThreadPoolTestCase {
     private int expectedSize(final String threadPoolName, final int numberOfProcessors) {
         final Map<String, Function<Integer, Integer>> sizes = new HashMap<>();
         sizes.put(ThreadPool.Names.GENERIC, n -> ThreadPool.boundedBy(4 * n, 128, 512));
-        sizes.put(ThreadPool.Names.MANAGEMENT, n -> 5);
+        sizes.put(ThreadPool.Names.MANAGEMENT, n -> ThreadPool.boundedBy(n, 1, 5));
         sizes.put(ThreadPool.Names.FLUSH, ThreadPool::halfAllocatedProcessorsMaxFive);
         sizes.put(ThreadPool.Names.REFRESH, ThreadPool::halfAllocatedProcessorsMaxTen);
         sizes.put(ThreadPool.Names.WARMER, ThreadPool::halfAllocatedProcessorsMaxFive);