浏览代码

Deprecate the listener thread pool (#53266)

The listener thread pool is being removed from use in the server
codebase. This commit deprecates configuring the listener thread pool.
Jason Tedor 5 年之前
父节点
当前提交
51d2b3141b

+ 7 - 0
docs/reference/migration/migrate_8_0/settings.asciidoc

@@ -37,3 +37,10 @@ processors. As this leads to more context switches and more threads but without
 an increase in the number of physical CPUs on which to schedule these additional
 an increase in the number of physical CPUs on which to schedule these additional
 threads, the `node.processors` setting is now bounded by the number of available
 threads, the `node.processors` setting is now bounded by the number of available
 processors.
 processors.
+
+[float]
+==== `thread_pool.listener.size` and `thread_pool.listener.queue_size` have been deprecated
+
+The listener thread pool is no longer used internally by Elasticsearch.
+Therefore, these settings have been deprecated. You can safely remove these
+settings from the configuration of your nodes.

+ 57 - 7
server/src/main/java/org/elasticsearch/threadpool/FixedExecutorBuilder.java

@@ -51,7 +51,28 @@ public final class FixedExecutorBuilder extends ExecutorBuilder<FixedExecutorBui
      * @param trackEWMA whether to track the exponentially weighted moving average of the task execution time
      * @param trackEWMA whether to track the exponentially weighted moving average of the task execution time
      */
      */
     FixedExecutorBuilder(final Settings settings, final String name, final int size, final int queueSize, final boolean trackEWMA) {
     FixedExecutorBuilder(final Settings settings, final String name, final int size, final int queueSize, final boolean trackEWMA) {
-        this(settings, name, size, queueSize, "thread_pool." + name, trackEWMA);
+        this(settings, name, size, queueSize, trackEWMA, false);
+    }
+
+    /**
+     * Construct a fixed executor builder; the settings will have the key prefix "thread_pool." followed by the executor name.
+     *
+     * @param settings   the node-level settings
+     * @param name       the name of the executor
+     * @param size       the fixed number of threads
+     * @param queueSize  the size of the backing queue, -1 for unbounded
+     * @param trackEWMA  whether to track the exponentially weighted moving average of the task execution time
+     * @param deprecated whether or not the thread pool is deprecated
+     */
+    FixedExecutorBuilder(
+        final Settings settings,
+        final String name,
+        final int size,
+        final int queueSize,
+        final boolean trackEWMA,
+        final boolean deprecated
+    ) {
+        this(settings, name, size, queueSize, "thread_pool." + name, trackEWMA, deprecated);
     }
     }
 
 
     /**
     /**
@@ -66,16 +87,45 @@ public final class FixedExecutorBuilder extends ExecutorBuilder<FixedExecutorBui
      */
      */
     public FixedExecutorBuilder(final Settings settings, final String name, final int size, final int queueSize, final String prefix,
     public FixedExecutorBuilder(final Settings settings, final String name, final int size, final int queueSize, final String prefix,
                                 final boolean trackEWMA) {
                                 final boolean trackEWMA) {
+        this(settings, name, size, queueSize, prefix, trackEWMA, false);
+    }
+
+    /**
+     * Construct a fixed executor builder.
+     *
+     * @param settings   the node-level settings
+     * @param name       the name of the executor
+     * @param size       the fixed number of threads
+     * @param queueSize  the size of the backing queue, -1 for unbounded
+     * @param prefix     the prefix for the settings keys
+     * @param trackEWMA  whether to track the exponentially weighted moving average of the task execution time
+     * @param deprecated whether or not the thread pool is deprecated
+     */
+    public FixedExecutorBuilder(
+        final Settings settings,
+        final String name,
+        final int size,
+        final int queueSize,
+        final String prefix,
+        final boolean trackEWMA,
+        final boolean deprecated
+    ) {
         super(name);
         super(name);
         final String sizeKey = settingsKey(prefix, "size");
         final String sizeKey = settingsKey(prefix, "size");
+        final Setting.Property[] properties;
+        if (deprecated) {
+            properties = new Setting.Property[]{Setting.Property.NodeScope, Setting.Property.Deprecated};
+        } else {
+            properties = new Setting.Property[]{Setting.Property.NodeScope};
+        }
         this.sizeSetting =
         this.sizeSetting =
-                new Setting<>(
-                        sizeKey,
-                        s -> Integer.toString(size),
-                        s -> Setting.parseInt(s, 1, applyHardSizeLimit(settings, name), sizeKey),
-                        Setting.Property.NodeScope);
+            new Setting<>(
+                sizeKey,
+                s -> Integer.toString(size),
+                s -> Setting.parseInt(s, 1, applyHardSizeLimit(settings, name), sizeKey),
+                properties);
         final String queueSizeKey = settingsKey(prefix, "queue_size");
         final String queueSizeKey = settingsKey(prefix, "queue_size");
-        this.queueSizeSetting = Setting.intSetting(queueSizeKey, queueSize, Setting.Property.NodeScope);
+        this.queueSizeSetting = Setting.intSetting(queueSizeKey, queueSize, properties);
         this.trackEWMA = trackEWMA;
         this.trackEWMA = trackEWMA;
     }
     }
 
 

+ 2 - 2
server/src/main/java/org/elasticsearch/threadpool/ThreadPool.java

@@ -68,7 +68,7 @@ public class ThreadPool implements Scheduler {
     public static class Names {
     public static class Names {
         public static final String SAME = "same";
         public static final String SAME = "same";
         public static final String GENERIC = "generic";
         public static final String GENERIC = "generic";
-        public static final String LISTENER = "listener";
+        @Deprecated public static final String LISTENER = "listener";
         public static final String GET = "get";
         public static final String GET = "get";
         public static final String ANALYZE = "analyze";
         public static final String ANALYZE = "analyze";
         public static final String WRITE = "write";
         public static final String WRITE = "write";
@@ -172,7 +172,7 @@ public class ThreadPool implements Scheduler {
         builders.put(Names.MANAGEMENT, new ScalingExecutorBuilder(Names.MANAGEMENT, 1, 5, TimeValue.timeValueMinutes(5)));
         builders.put(Names.MANAGEMENT, new ScalingExecutorBuilder(Names.MANAGEMENT, 1, 5, TimeValue.timeValueMinutes(5)));
         // no queue as this means clients will need to handle rejections on listener queue even if the operation succeeded
         // no queue as this means clients will need to handle rejections on listener queue even if the operation succeeded
         // the assumption here is that the listeners should be very lightweight on the listeners side
         // the assumption here is that the listeners should be very lightweight on the listeners side
-        builders.put(Names.LISTENER, new FixedExecutorBuilder(settings, Names.LISTENER, halfProcMaxAt10, -1, false));
+        builders.put(Names.LISTENER, new FixedExecutorBuilder(settings, Names.LISTENER, halfProcMaxAt10, -1, false, true));
         builders.put(Names.FLUSH, new ScalingExecutorBuilder(Names.FLUSH, 1, halfProcMaxAt5, TimeValue.timeValueMinutes(5)));
         builders.put(Names.FLUSH, new ScalingExecutorBuilder(Names.FLUSH, 1, halfProcMaxAt5, TimeValue.timeValueMinutes(5)));
         builders.put(Names.REFRESH, new ScalingExecutorBuilder(Names.REFRESH, 1, halfProcMaxAt10, TimeValue.timeValueMinutes(5)));
         builders.put(Names.REFRESH, new ScalingExecutorBuilder(Names.REFRESH, 1, halfProcMaxAt10, TimeValue.timeValueMinutes(5)));
         builders.put(Names.WARMER, new ScalingExecutorBuilder(Names.WARMER, 1, halfProcMaxAt5, TimeValue.timeValueMinutes(5)));
         builders.put(Names.WARMER, new ScalingExecutorBuilder(Names.WARMER, 1, halfProcMaxAt5, TimeValue.timeValueMinutes(5)));

+ 5 - 0
server/src/test/java/org/elasticsearch/threadpool/FixedThreadPoolTests.java

@@ -22,6 +22,7 @@ package org.elasticsearch.threadpool;
 import org.elasticsearch.common.settings.Settings;
 import org.elasticsearch.common.settings.Settings;
 import org.elasticsearch.common.util.concurrent.EsExecutors;
 import org.elasticsearch.common.util.concurrent.EsExecutors;
 import org.elasticsearch.common.util.concurrent.EsRejectedExecutionException;
 import org.elasticsearch.common.util.concurrent.EsRejectedExecutionException;
+import org.elasticsearch.threadpool.ThreadPool.Names;
 
 
 import java.util.concurrent.CountDownLatch;
 import java.util.concurrent.CountDownLatch;
 
 
@@ -88,6 +89,10 @@ public class FixedThreadPoolTests extends ESThreadPoolTestCase {
         } finally {
         } finally {
             terminateThreadPoolIfNeeded(threadPool);
             terminateThreadPoolIfNeeded(threadPool);
         }
         }
+
+        if (Names.LISTENER.equals(threadPoolName)) {
+            assertSettingDeprecationsAndWarnings(new String[]{"thread_pool.listener.queue_size", "thread_pool.listener.size"});
+        }
     }
     }
 
 
 }
 }

+ 8 - 0
server/src/test/java/org/elasticsearch/threadpool/UpdateThreadPoolSettingsTests.java

@@ -118,6 +118,10 @@ public class UpdateThreadPoolSettingsTests extends ESThreadPoolTestCase {
         } finally {
         } finally {
             terminateThreadPoolIfNeeded(threadPool);
             terminateThreadPoolIfNeeded(threadPool);
         }
         }
+
+        if (Names.LISTENER.equals(threadPoolName)) {
+            assertSettingDeprecationsAndWarnings(new String[]{"thread_pool.listener.size"});
+        }
     }
     }
 
 
     public void testScalingExecutorType() throws InterruptedException {
     public void testScalingExecutorType() throws InterruptedException {
@@ -173,6 +177,10 @@ public class UpdateThreadPoolSettingsTests extends ESThreadPoolTestCase {
         } finally {
         } finally {
             terminateThreadPoolIfNeeded(threadPool);
             terminateThreadPoolIfNeeded(threadPool);
         }
         }
+
+        if (Names.LISTENER.equals(threadPoolName)) {
+            assertSettingDeprecationsAndWarnings(new String[]{"thread_pool.listener.queue_size"});
+        }
     }
     }
 
 
     public void testCustomThreadPool() throws Exception {
     public void testCustomThreadPool() throws Exception {