Browse Source

Remove the index thread pool (#29556)

Now that single-document indexing requests are executed on the bulk
thread pool the index thread pool is no longer needed. This commit
removes this thread pool from Elasticsearch.
Jason Tedor 7 years ago
parent
commit
2b47d67d95

+ 0 - 2
docs/reference/cat/thread_pool.asciidoc

@@ -22,7 +22,6 @@ node-0 flush               0 0 0
 node-0 force_merge         0 0 0
 node-0 generic             0 0 0
 node-0 get                 0 0 0
-node-0 index               0 0 0
 node-0 listener            0 0 0
 node-0 management          1 0 0
 node-0 refresh             0 0 0
@@ -52,7 +51,6 @@ flush
 force_merge
 generic
 get
-index
 listener
 management
 refresh

+ 9 - 1
docs/reference/migration/migrate_7_0/settings.asciidoc

@@ -5,4 +5,12 @@
 ==== Percolator
 
 * The deprecated `index.percolator.map_unmapped_fields_as_string` setting has been removed in favour of
-  the `index.percolator.map_unmapped_fields_as_text` setting.
+  the `index.percolator.map_unmapped_fields_as_text` setting.
+
+==== Index thread pool
+
+* Internally, single-document index/delete/update requests are executed as bulk
+  requests with a single-document payload. This means that these requests are
+  executed on the bulk thread pool. As such, the indexing thread pool is no
+  longer needed and has been removed. As such, the  settings
+  `thread_pool.index.size` and `thread_pool.index.queue_size` have been removed.

+ 3 - 9
docs/reference/modules/threadpool.asciidoc

@@ -13,12 +13,6 @@ There are several thread pools, but the important ones include:
     For generic operations (e.g., background node discovery).
     Thread pool type is `scaling`.
 
-`index`::
-    For index/delete operations. Thread pool type is `fixed`
-    with a size of `# of available processors`,
-    queue_size of `200`.  The maximum size for this pool
-    is `1 + # of available processors`.
-
 `search`::
     For count/search/suggest operations. Thread pool type is
     `fixed_auto_queue_size` with a size of
@@ -55,13 +49,13 @@ There are several thread pools, but the important ones include:
     Mainly for java client executing of action when listener threaded is set to true.
     Thread pool type is `scaling` with a default max of `min(10, (# of available processors)/2)`.
 
-Changing a specific thread pool can be done by setting its type-specific parameters; for example, changing the `index`
+Changing a specific thread pool can be done by setting its type-specific parameters; for example, changing the `bulk`
 thread pool to have more threads:
 
 [source,yaml]
 --------------------------------------------------
 thread_pool:
-    index:
+    bulk:
         size: 30
 --------------------------------------------------
 
@@ -89,7 +83,7 @@ full, it will abort the request.
 [source,yaml]
 --------------------------------------------------
 thread_pool:
-    index:
+    bulk:
         size: 30
         queue_size: 1000
 --------------------------------------------------

+ 2 - 4
rest-api-spec/src/main/resources/rest-api-spec/test/cat.thread_pool/10_basic.yml

@@ -33,7 +33,7 @@
 
   - do:
       cat.thread_pool:
-          thread_pool_patterns: bulk,management,flush,index,generic,force_merge
+          thread_pool_patterns: bulk,management,flush,generic,force_merge
           h: id,name,active
           v: true
 
@@ -44,7 +44,6 @@
                    \S+\s+ flush       \s+ \d+    \n
                    \S+\s+ force_merge \s+ \d+    \n
                    \S+\s+ generic     \s+ \d+    \n
-                   \S+\s+ index       \s+ \d+    \n
                    \S+\s+ management  \s+ \d+    \n)+  $/
 
   - do:
@@ -72,12 +71,11 @@
 
   - do:
       cat.thread_pool:
-          thread_pool_patterns: bulk,index,search
+          thread_pool_patterns: bulk,search
           size: ""
 
   - match:
       $body: |
                /  #node_name     name       active     queue     rejected
                ^  (\S+       \s+ bulk   \s+ \d+    \s+ \d+   \s+ \d+      \n
-                   \S+       \s+ index  \s+ \d+    \s+ \d+   \s+ \d+      \n
                    \S+       \s+ search \s+ \d+    \s+ \d+   \s+ \d+      \n)+  $/

+ 1 - 1
server/src/main/java/org/elasticsearch/action/delete/TransportDeleteAction.java

@@ -46,7 +46,7 @@ public class TransportDeleteAction extends TransportSingleItemBulkWriteAction<De
                                  ActionFilters actionFilters, IndexNameExpressionResolver indexNameExpressionResolver,
                                  TransportBulkAction bulkAction, TransportShardBulkAction shardBulkAction) {
         super(settings, DeleteAction.NAME, transportService, clusterService, indicesService, threadPool, shardStateAction,
-            actionFilters, indexNameExpressionResolver, DeleteRequest::new, DeleteRequest::new, ThreadPool.Names.INDEX,
+            actionFilters, indexNameExpressionResolver, DeleteRequest::new, DeleteRequest::new, ThreadPool.Names.BULK,
             bulkAction, shardBulkAction);
     }
 

+ 1 - 1
server/src/main/java/org/elasticsearch/action/index/TransportIndexAction.java

@@ -54,7 +54,7 @@ public class TransportIndexAction extends TransportSingleItemBulkWriteAction<Ind
                                 ActionFilters actionFilters, IndexNameExpressionResolver indexNameExpressionResolver,
                                 TransportBulkAction bulkAction, TransportShardBulkAction shardBulkAction) {
         super(settings, IndexAction.NAME, transportService, clusterService, indicesService, threadPool, shardStateAction,
-            actionFilters, indexNameExpressionResolver, IndexRequest::new, IndexRequest::new, ThreadPool.Names.INDEX,
+            actionFilters, indexNameExpressionResolver, IndexRequest::new, IndexRequest::new, ThreadPool.Names.BULK,
             bulkAction, shardBulkAction);
     }
 

+ 1 - 1
server/src/main/java/org/elasticsearch/action/update/TransportUpdateAction.java

@@ -86,7 +86,7 @@ public class TransportUpdateAction extends TransportInstanceSingleOperationActio
 
     @Override
     protected String executor() {
-        return ThreadPool.Names.INDEX;
+        return ThreadPool.Names.BULK;
     }
 
     @Override

+ 1 - 1
server/src/main/java/org/elasticsearch/threadpool/ExecutorBuilder.java

@@ -48,7 +48,7 @@ public abstract class ExecutorBuilder<U extends ExecutorBuilder.ExecutorSettings
     }
 
     protected int applyHardSizeLimit(final Settings settings, final String name) {
-        if (name.equals(ThreadPool.Names.BULK) || name.equals(ThreadPool.Names.INDEX)) {
+        if (name.equals(ThreadPool.Names.BULK)) {
             return 1 + EsExecutors.numberOfProcessors(settings);
         } else {
             return Integer.MAX_VALUE;

+ 3 - 41
server/src/main/java/org/elasticsearch/threadpool/FixedExecutorBuilder.java

@@ -49,20 +49,7 @@ public final class FixedExecutorBuilder extends ExecutorBuilder<FixedExecutorBui
      * @param queueSize the size of the backing queue, -1 for unbounded
      */
     FixedExecutorBuilder(final Settings settings, final String name, final int size, final int queueSize) {
-        this(settings, name, size, queueSize, false);
-    }
-
-    /**
-     * Construct a fixed executor builder; the settings will have the key prefix "thread_pool." followed by the executor name.
-     *
-     * @param settings   the node-level settings
-     * @param name       the name of the executor
-     * @param size       the fixed number of threads
-     * @param queueSize  the size of the backing queue, -1 for unbounded
-     * @param deprecated whether or not the thread pool is deprecated
-     */
-    FixedExecutorBuilder(final Settings settings, final String name, final int size, final int queueSize, final boolean deprecated) {
-        this(settings, name, size, queueSize, "thread_pool." + name, deprecated);
+        this(settings, name, size, queueSize, "thread_pool." + name);
     }
 
     /**
@@ -75,41 +62,16 @@ public final class FixedExecutorBuilder extends ExecutorBuilder<FixedExecutorBui
      * @param prefix    the prefix for the settings keys
      */
     public FixedExecutorBuilder(final Settings settings, final String name, final int size, final int queueSize, final String prefix) {
-        this(settings, name, size, queueSize, prefix, false);
-    }
-
-    /**
-     * Construct a fixed executor builder.
-     *
-     * @param settings  the node-level settings
-     * @param name      the name of the executor
-     * @param size      the fixed number of threads
-     * @param queueSize the size of the backing queue, -1 for unbounded
-     * @param prefix    the prefix for the settings keys
-     */
-    private FixedExecutorBuilder(
-            final Settings settings,
-            final String name,
-            final int size,
-            final int queueSize,
-            final String prefix,
-            final boolean deprecated) {
         super(name);
         final String sizeKey = settingsKey(prefix, "size");
-        final Setting.Property[] properties;
-        if (deprecated) {
-            properties = new Setting.Property[]{Setting.Property.NodeScope, Setting.Property.Deprecated};
-        } else {
-            properties = new Setting.Property[]{Setting.Property.NodeScope};
-        }
         this.sizeSetting =
                 new Setting<>(
                         sizeKey,
                         s -> Integer.toString(size),
                         s -> Setting.parseInt(s, 1, applyHardSizeLimit(settings, name), sizeKey),
-                        properties);
+                        Setting.Property.NodeScope);
         final String queueSizeKey = settingsKey(prefix, "queue_size");
-        this.queueSizeSetting = Setting.intSetting(queueSizeKey, queueSize, properties);
+        this.queueSizeSetting = Setting.intSetting(queueSizeKey, queueSize, Setting.Property.NodeScope);
     }
 
     @Override

+ 0 - 3
server/src/main/java/org/elasticsearch/threadpool/ThreadPool.java

@@ -69,7 +69,6 @@ public class ThreadPool extends AbstractComponent implements Scheduler, Closeabl
         public static final String LISTENER = "listener";
         public static final String GET = "get";
         public static final String ANALYZE = "analyze";
-        public static final String INDEX = "index";
         public static final String BULK = "bulk";
         public static final String SEARCH = "search";
         public static final String MANAGEMENT = "management";
@@ -126,7 +125,6 @@ public class ThreadPool extends AbstractComponent implements Scheduler, Closeabl
         map.put(Names.LISTENER, ThreadPoolType.FIXED);
         map.put(Names.GET, ThreadPoolType.FIXED);
         map.put(Names.ANALYZE, ThreadPoolType.FIXED);
-        map.put(Names.INDEX, ThreadPoolType.FIXED);
         map.put(Names.BULK, ThreadPoolType.FIXED);
         map.put(Names.SEARCH, ThreadPoolType.FIXED_AUTO_QUEUE_SIZE);
         map.put(Names.MANAGEMENT, ThreadPoolType.SCALING);
@@ -172,7 +170,6 @@ public class ThreadPool extends AbstractComponent implements Scheduler, Closeabl
         final int halfProcMaxAt10 = halfNumberOfProcessorsMaxTen(availableProcessors);
         final int genericThreadPoolMax = boundedBy(4 * availableProcessors, 128, 512);
         builders.put(Names.GENERIC, new ScalingExecutorBuilder(Names.GENERIC, 4, genericThreadPoolMax, TimeValue.timeValueSeconds(30)));
-        builders.put(Names.INDEX, new FixedExecutorBuilder(settings, Names.INDEX, availableProcessors, 200, true));
         builders.put(Names.BULK, new FixedExecutorBuilder(settings, Names.BULK, availableProcessors, 200)); // now that we reuse bulk for index/delete ops
         builders.put(Names.GET, new FixedExecutorBuilder(settings, Names.GET, availableProcessors, 1000));
         builders.put(Names.ANALYZE, new FixedExecutorBuilder(settings, Names.ANALYZE, 1, 16));

+ 2 - 2
server/src/test/java/org/elasticsearch/action/RejectionActionIT.java

@@ -45,8 +45,8 @@ public class RejectionActionIT extends ESIntegTestCase {
                 .put(super.nodeSettings(nodeOrdinal))
                 .put("thread_pool.search.size", 1)
                 .put("thread_pool.search.queue_size", 1)
-                .put("thread_pool.index.size", 1)
-                .put("thread_pool.index.queue_size", 1)
+                .put("thread_pool.bulk.size", 1)
+                .put("thread_pool.bulk.queue_size", 1)
                 .put("thread_pool.get.size", 1)
                 .put("thread_pool.get.queue_size", 1)
                 .build();

+ 1 - 1
server/src/test/java/org/elasticsearch/action/bulk/TransportBulkActionIngestTests.java

@@ -124,7 +124,7 @@ public class TransportBulkActionIngestTests extends ESTestCase {
             super(Settings.EMPTY, IndexAction.NAME, TransportBulkActionIngestTests.this.transportService,
                     TransportBulkActionIngestTests.this.clusterService,
                     null, null, null, new ActionFilters(Collections.emptySet()), null,
-                    IndexRequest::new, IndexRequest::new, ThreadPool.Names.INDEX, bulkAction, null);
+                    IndexRequest::new, IndexRequest::new, ThreadPool.Names.BULK, bulkAction, null);
         }
 
         @Override

+ 1 - 1
server/src/test/java/org/elasticsearch/index/replication/ESIndexLevelReplicationTestCase.java

@@ -542,7 +542,7 @@ public abstract class ESIndexLevelReplicationTestCase extends IndexShardTestCase
                                 listener.onFailure(e);
                             }
                         },
-                        ThreadPool.Names.INDEX, request);
+                        ThreadPool.Names.BULK, request);
             }
 
             @Override

+ 12 - 12
server/src/test/java/org/elasticsearch/index/shard/IndexShardTests.java

@@ -284,14 +284,14 @@ public class IndexShardTests extends IndexShardTestCase {
         closeShards(indexShard);
         assertThat(indexShard.getActiveOperationsCount(), equalTo(0));
         try {
-            indexShard.acquirePrimaryOperationPermit(null, ThreadPool.Names.INDEX, "");
+            indexShard.acquirePrimaryOperationPermit(null, ThreadPool.Names.BULK, "");
             fail("we should not be able to increment anymore");
         } catch (IndexShardClosedException e) {
             // expected
         }
         try {
             indexShard.acquireReplicaOperationPermit(indexShard.getPrimaryTerm(), SequenceNumbers.UNASSIGNED_SEQ_NO, null,
-                ThreadPool.Names.INDEX, "");
+                ThreadPool.Names.BULK, "");
             fail("we should not be able to increment anymore");
         } catch (IndexShardClosedException e) {
             // expected
@@ -302,7 +302,7 @@ public class IndexShardTests extends IndexShardTestCase {
         IndexShard indexShard = newShard(false);
         expectThrows(IndexShardNotStartedException.class, () ->
             indexShard.acquireReplicaOperationPermit(indexShard.getPrimaryTerm() + randomIntBetween(1, 100),
-                SequenceNumbers.UNASSIGNED_SEQ_NO, null, ThreadPool.Names.INDEX, ""));
+                SequenceNumbers.UNASSIGNED_SEQ_NO, null, ThreadPool.Names.BULK, ""));
         closeShards(indexShard);
     }
 
@@ -342,7 +342,7 @@ public class IndexShardTests extends IndexShardTestCase {
                                 throw new RuntimeException(e);
                             }
                         },
-                        ThreadPool.Names.INDEX, id);
+                        ThreadPool.Names.BULK, id);
             });
             thread.start();
             threads.add(thread);
@@ -393,7 +393,7 @@ public class IndexShardTests extends IndexShardTestCase {
                                 throw new RuntimeException(e);
                             }
                         },
-                        ThreadPool.Names.INDEX, id);
+                        ThreadPool.Names.BULK, id);
             });
             thread.start();
             delayedThreads.add(thread);
@@ -589,7 +589,7 @@ public class IndexShardTests extends IndexShardTestCase {
         assertEquals(0, indexShard.getActiveOperationsCount());
         if (indexShard.routingEntry().isRelocationTarget() == false) {
             try {
-                indexShard.acquireReplicaOperationPermit(primaryTerm, indexShard.getGlobalCheckpoint(), null, ThreadPool.Names.INDEX, "");
+                indexShard.acquireReplicaOperationPermit(primaryTerm, indexShard.getGlobalCheckpoint(), null, ThreadPool.Names.BULK, "");
                 fail("shard shouldn't accept operations as replica");
             } catch (IllegalStateException ignored) {
 
@@ -608,14 +608,14 @@ public class IndexShardTests extends IndexShardTestCase {
 
     private Releasable acquirePrimaryOperationPermitBlockingly(IndexShard indexShard) throws ExecutionException, InterruptedException {
         PlainActionFuture<Releasable> fut = new PlainActionFuture<>();
-        indexShard.acquirePrimaryOperationPermit(fut, ThreadPool.Names.INDEX, "");
+        indexShard.acquirePrimaryOperationPermit(fut, ThreadPool.Names.BULK, "");
         return fut.get();
     }
 
     private Releasable acquireReplicaOperationPermitBlockingly(IndexShard indexShard, long opPrimaryTerm)
         throws ExecutionException, InterruptedException {
         PlainActionFuture<Releasable> fut = new PlainActionFuture<>();
-        indexShard.acquireReplicaOperationPermit(opPrimaryTerm, indexShard.getGlobalCheckpoint(), fut, ThreadPool.Names.INDEX, "");
+        indexShard.acquireReplicaOperationPermit(opPrimaryTerm, indexShard.getGlobalCheckpoint(), fut, ThreadPool.Names.BULK, "");
         return fut.get();
     }
 
@@ -663,7 +663,7 @@ public class IndexShardTests extends IndexShardTestCase {
         if (shardRouting.primary() == false) {
             final IllegalStateException e =
                     expectThrows(IllegalStateException.class,
-                        () -> indexShard.acquirePrimaryOperationPermit(null, ThreadPool.Names.INDEX, ""));
+                        () -> indexShard.acquirePrimaryOperationPermit(null, ThreadPool.Names.BULK, ""));
             assertThat(e, hasToString(containsString("shard " + shardRouting + " is not a primary")));
         }
 
@@ -700,7 +700,7 @@ public class IndexShardTests extends IndexShardTestCase {
             };
 
             indexShard.acquireReplicaOperationPermit(primaryTerm - 1, SequenceNumbers.UNASSIGNED_SEQ_NO, onLockAcquired,
-                ThreadPool.Names.INDEX, "");
+                ThreadPool.Names.BULK, "");
 
             assertFalse(onResponse.get());
             assertTrue(onFailure.get());
@@ -1020,7 +1020,7 @@ public class IndexShardTests extends IndexShardTestCase {
                             latch.countDown();
                         }
                     },
-                    ThreadPool.Names.INDEX, "");
+                    ThreadPool.Names.BULK, "");
         };
 
         final long firstIncrement = 1 + (randomBoolean() ? 0 : 1);
@@ -1381,7 +1381,7 @@ public class IndexShardTests extends IndexShardTestCase {
                     super.onResponse(releasable);
                 }
             };
-            shard.acquirePrimaryOperationPermit(onLockAcquired, ThreadPool.Names.INDEX, "i_" + i);
+            shard.acquirePrimaryOperationPermit(onLockAcquired, ThreadPool.Names.BULK, "i_" + i);
             onLockAcquiredActions.add(onLockAcquired);
         }
 

+ 1 - 1
server/src/test/java/org/elasticsearch/indices/flush/SyncedFlushSingleNodeTests.java

@@ -113,7 +113,7 @@ public class SyncedFlushSingleNodeTests extends ESSingleNodeTestCase {
         SyncedFlushService flushService = getInstanceFromNode(SyncedFlushService.class);
         final ShardId shardId = shard.shardId();
         PlainActionFuture<Releasable> fut = new PlainActionFuture<>();
-        shard.acquirePrimaryOperationPermit(fut, ThreadPool.Names.INDEX, "");
+        shard.acquirePrimaryOperationPermit(fut, ThreadPool.Names.BULK, "");
         try (Releasable operationLock = fut.get()) {
             SyncedFlushUtil.LatchedListener<ShardsSyncedFlushResult> listener = new SyncedFlushUtil.LatchedListener<>();
             flushService.attemptSyncedFlush(shardId, listener);

+ 0 - 4
server/src/test/java/org/elasticsearch/threadpool/FixedThreadPoolTests.java

@@ -85,10 +85,6 @@ public class FixedThreadPoolTests extends ESThreadPoolTestCase {
 
             assertThat(counter, equalTo(rejections));
             assertThat(stats(threadPool, threadPoolName).getRejected(), equalTo(rejections));
-
-            if (threadPoolName.equals(ThreadPool.Names.INDEX)) {
-                assertSettingDeprecationsAndWarnings(new String[]{"thread_pool.index.queue_size", "thread_pool.index.size"});
-            }
         } finally {
             terminateThreadPoolIfNeeded(threadPool);
         }

+ 4 - 17
server/src/test/java/org/elasticsearch/threadpool/UpdateThreadPoolSettingsTests.java

@@ -60,8 +60,7 @@ public class UpdateThreadPoolSettingsTests extends ESThreadPoolTestCase {
         }
     }
 
-    public void testIndexingThreadPoolsMaxSize() throws InterruptedException {
-        final String name = randomFrom(Names.BULK, Names.INDEX);
+    public void testBulkThreadPoolsMaxSize() {
         final int maxSize = 1 + EsExecutors.numberOfProcessors(Settings.EMPTY);
         final int tooBig = randomIntBetween(1 + maxSize, Integer.MAX_VALUE);
 
@@ -74,7 +73,7 @@ public class UpdateThreadPoolSettingsTests extends ESThreadPoolTestCase {
                     try {
                         tp = new ThreadPool(Settings.builder()
                             .put("node.name", "testIndexingThreadPoolsMaxSize")
-                            .put("thread_pool." + name + ".size", tooBig)
+                            .put("thread_pool." + Names.BULK + ".size", tooBig)
                             .build());
                     } finally {
                         terminateThreadPoolIfNeeded(tp);
@@ -84,15 +83,11 @@ public class UpdateThreadPoolSettingsTests extends ESThreadPoolTestCase {
         assertThat(
             initial,
             hasToString(containsString(
-                "Failed to parse value [" + tooBig + "] for setting [thread_pool." + name + ".size] must be ")));
-
-        if (name.equals(Names.INDEX)) {
-            assertSettingDeprecationsAndWarnings(new String[] { "thread_pool.index.size" });
-        }
+                "Failed to parse value [" + tooBig + "] for setting [thread_pool." + Names.BULK + ".size] must be ")));
     }
 
     private static int getExpectedThreadPoolSize(Settings settings, String name, int size) {
-        if (name.equals(ThreadPool.Names.BULK) || name.equals(ThreadPool.Names.INDEX)) {
+        if (name.equals(ThreadPool.Names.BULK)) {
             return Math.min(size, EsExecutors.numberOfProcessors(settings));
         } else {
             return size;
@@ -120,10 +115,6 @@ public class UpdateThreadPoolSettingsTests extends ESThreadPoolTestCase {
             assertThat(info(threadPool, threadPoolName).getMax(), equalTo(expectedSize));
             // keep alive does not apply to fixed thread pools
             assertThat(((EsThreadPoolExecutor) threadPool.executor(threadPoolName)).getKeepAliveTime(TimeUnit.MINUTES), equalTo(0L));
-
-            if (threadPoolName.equals(Names.INDEX)) {
-                assertSettingDeprecationsAndWarnings(new String[] { "thread_pool.index.size" });
-            }
         } finally {
             terminateThreadPoolIfNeeded(threadPool);
         }
@@ -179,10 +170,6 @@ public class UpdateThreadPoolSettingsTests extends ESThreadPoolTestCase {
             latch.await(3, TimeUnit.SECONDS); // if this throws then ThreadPool#shutdownNow did not interrupt
             assertThat(oldExecutor.isShutdown(), equalTo(true));
             assertThat(oldExecutor.isTerminating() || oldExecutor.isTerminated(), equalTo(true));
-
-            if (threadPoolName.equals(Names.INDEX)) {
-                assertSettingDeprecationsAndWarnings(new String[] { "thread_pool.index.queue_size" });
-            }
         } finally {
             terminateThreadPoolIfNeeded(threadPool);
         }