Parcourir la source

Add node.processors setting in favor of processors (#45855)

This commit namespaces the existing processors setting under the "node"
namespace. In doing so, we deprecate the existing processors setting in
favor of node.processors.
Jason Tedor il y a 6 ans
Parent
commit
d05101b9e5

+ 1 - 1
modules/transport-netty4/src/main/java/org/elasticsearch/http/netty4/Netty4HttpServerTransport.java

@@ -152,7 +152,7 @@ public class Netty4HttpServerTransport extends AbstractHttpServerTransport {
     public Netty4HttpServerTransport(Settings settings, NetworkService networkService, BigArrays bigArrays, ThreadPool threadPool,
                                      NamedXContentRegistry xContentRegistry, Dispatcher dispatcher) {
         super(settings, networkService, bigArrays, threadPool, xContentRegistry, dispatcher);
-        Netty4Utils.setAvailableProcessors(EsExecutors.PROCESSORS_SETTING.get(settings));
+        Netty4Utils.setAvailableProcessors(EsExecutors.NODE_PROCESSORS_SETTING.get(settings));
 
         this.maxChunkSize = SETTING_HTTP_MAX_CHUNK_SIZE.get(settings);
         this.maxHeaderSize = SETTING_HTTP_MAX_HEADER_SIZE.get(settings);

+ 1 - 1
modules/transport-netty4/src/main/java/org/elasticsearch/transport/netty4/Netty4Transport.java

@@ -112,7 +112,7 @@ public class Netty4Transport extends TcpTransport {
                            PageCacheRecycler pageCacheRecycler, NamedWriteableRegistry namedWriteableRegistry,
                            CircuitBreakerService circuitBreakerService) {
         super(settings, version, threadPool, pageCacheRecycler, circuitBreakerService, namedWriteableRegistry, networkService);
-        Netty4Utils.setAvailableProcessors(EsExecutors.PROCESSORS_SETTING.get(settings));
+        Netty4Utils.setAvailableProcessors(EsExecutors.NODE_PROCESSORS_SETTING.get(settings));
         this.workerCount = WORKER_COUNT.get(settings);
 
         // See AdaptiveReceiveBufferSizePredictor#DEFAULT_XXX for default values in netty..., we can use higher ones for us, even fixed one

BIN
plugins/repository-s3/lib/aws-java-sdk-s3-1.11.562.jar


+ 1 - 0
server/src/main/java/org/elasticsearch/common/settings/ClusterSettings.java

@@ -390,6 +390,7 @@ public final class ClusterSettings extends AbstractScopedSettings {
             Client.CLIENT_TYPE_SETTING_S,
             ClusterModule.SHARDS_ALLOCATOR_TYPE_SETTING,
             EsExecutors.PROCESSORS_SETTING,
+            EsExecutors.NODE_PROCESSORS_SETTING,
             ThreadContext.DEFAULT_HEADERS_SETTING,
             Loggers.LOG_DEFAULT_LEVEL_SETTING,
             Loggers.LOG_LEVEL_SETTING,

+ 9 - 0
server/src/main/java/org/elasticsearch/common/settings/Setting.java

@@ -1053,6 +1053,15 @@ public class Setting<T> implements ToXContentObject {
         return new Setting<>(key, fallbackSetting, (s) -> parseInt(s, minValue, key), properties);
     }
 
+    public static Setting<Integer> intSetting(
+        final String key,
+        final Setting<Integer> fallbackSetting,
+        final int minValue,
+        final int maxValue,
+        final Property... properties) {
+        return new Setting<>(key, fallbackSetting, (s) -> parseInt(s, minValue, maxValue, key), properties);
+    }
+
     public static Setting<Integer> intSetting(String key, Setting<Integer> fallbackSetting, int minValue, Validator<Integer> validator,
                                               Property... properties) {
         return new Setting<>(new SimpleKey(key), fallbackSetting, fallbackSetting::getRaw, (s) -> parseInt(s, minValue, key),validator,

+ 13 - 4
server/src/main/java/org/elasticsearch/common/util/concurrent/EsExecutors.java

@@ -46,14 +46,23 @@ import java.util.stream.Collectors;
 
 public class EsExecutors {
 
-    /**
-     * Setting to manually set the number of available processors. This setting is used to adjust thread pool sizes per node.
-     */
     public static final Setting<Integer> PROCESSORS_SETTING = Setting.intSetting(
         "processors",
         Runtime.getRuntime().availableProcessors(),
         1,
         Runtime.getRuntime().availableProcessors(),
+        Property.Deprecated,
+        Property.NodeScope);
+
+    /**
+     * Setting to manually set the number of available processors. This setting is used to adjust thread pool sizes per node.
+     */
+    // TODO: when removing "processors" setting, the default value is Runtime.getRuntime().availableProcessors()
+    public static final Setting<Integer> NODE_PROCESSORS_SETTING = Setting.intSetting(
+        "node.processors",
+        PROCESSORS_SETTING,
+        1,
+        Runtime.getRuntime().availableProcessors(),
         Property.NodeScope);
 
     /**
@@ -65,7 +74,7 @@ public class EsExecutors {
      * @return the number of available processors
      */
     public static int numberOfProcessors(final Settings settings) {
-        return PROCESSORS_SETTING.get(settings);
+        return NODE_PROCESSORS_SETTING.get(settings);
     }
 
     public static PrioritizedEsThreadPoolExecutor newSinglePrioritizing(String name, ThreadFactory threadFactory,

+ 1 - 1
server/src/test/java/org/elasticsearch/action/admin/cluster/stats/ClusterStatsIT.java

@@ -203,7 +203,7 @@ public class ClusterStatsIT extends ESIntegTestCase {
 
     public void testAllocatedProcessors() throws Exception {
         // start one node with 7 processors.
-        internalCluster().startNode(Settings.builder().put(EsExecutors.PROCESSORS_SETTING.getKey(), 7).build());
+        internalCluster().startNode(Settings.builder().put(EsExecutors.NODE_PROCESSORS_SETTING.getKey(), 7).build());
         waitForNodes(1);
 
         ClusterStatsResponse response = client().admin().cluster().prepareClusterStats().get();

+ 22 - 5
server/src/test/java/org/elasticsearch/common/util/concurrent/EsExecutorsTests.java

@@ -19,10 +19,12 @@
 
 package org.elasticsearch.common.util.concurrent;
 
+import org.elasticsearch.common.settings.Setting;
 import org.elasticsearch.common.settings.Settings;
 import org.elasticsearch.test.ESTestCase;
 import org.hamcrest.Matcher;
 
+import java.util.Locale;
 import java.util.concurrent.CountDownLatch;
 import java.util.concurrent.CyclicBarrier;
 import java.util.concurrent.ThreadPoolExecutor;
@@ -389,15 +391,30 @@ public class EsExecutorsTests extends ESTestCase {
         }
     }
 
+    public void testNodeProcessorsBound() {
+        runProcessorsBoundTest(EsExecutors.NODE_PROCESSORS_SETTING);
+    }
+
     public void testProcessorsBound() {
+        runProcessorsBoundTest(EsExecutors.PROCESSORS_SETTING);
+    }
+
+    private void runProcessorsBoundTest(final Setting<Integer> processorsSetting) {
         final int available = Runtime.getRuntime().availableProcessors();
         final int processors = randomIntBetween(available + 1, Integer.MAX_VALUE);
-        final Settings settings = Settings.builder().put("processors", processors).build();
+        final Settings settings = Settings.builder().put(processorsSetting.getKey(), processors).build();
         final IllegalArgumentException e =
-            expectThrows(IllegalArgumentException.class, () -> EsExecutors.PROCESSORS_SETTING.get(settings));
-        assertThat(
-            e,
-            hasToString(containsString("Failed to parse value [" + processors + "] for setting [processors] must be <= " + available)));
+            expectThrows(IllegalArgumentException.class, () -> processorsSetting.get(settings));
+        final String expected = String.format(
+            Locale.ROOT,
+            "Failed to parse value [%d] for setting [%s] must be <= %d",
+            processors,
+            processorsSetting.getKey(),
+            available);
+        assertThat(e, hasToString(containsString(expected)));
+        if (processorsSetting.getProperties().contains(Setting.Property.Deprecated)) {
+            assertSettingDeprecationsAndWarnings(new Setting<?>[]{processorsSetting});
+        }
     }
 
 }

+ 2 - 2
server/src/test/java/org/elasticsearch/index/MergeSchedulerSettingsTests.java

@@ -31,7 +31,7 @@ import org.elasticsearch.common.logging.Loggers;
 import org.elasticsearch.common.settings.Settings;
 import org.elasticsearch.test.ESTestCase;
 
-import static org.elasticsearch.common.util.concurrent.EsExecutors.PROCESSORS_SETTING;
+import static org.elasticsearch.common.util.concurrent.EsExecutors.NODE_PROCESSORS_SETTING;
 import static org.elasticsearch.index.IndexSettingsTests.newIndexMeta;
 import static org.elasticsearch.index.MergeSchedulerConfig.MAX_MERGE_COUNT_SETTING;
 import static org.elasticsearch.index.MergeSchedulerConfig.MAX_THREAD_COUNT_SETTING;
@@ -139,7 +139,7 @@ public class MergeSchedulerSettingsTests extends ESTestCase {
             builder.put(MAX_MERGE_COUNT_SETTING.getKey(), maxMergeCount);
         }
         if (numProc != -1) {
-            builder.put(PROCESSORS_SETTING.getKey(), numProc);
+            builder.put(NODE_PROCESSORS_SETTING.getKey(), numProc);
         }
         return newIndexMeta("index", builder.build());
     }

+ 1 - 1
server/src/test/java/org/elasticsearch/indices/IndicesServiceCloseTests.java

@@ -70,7 +70,7 @@ public class IndicesServiceCloseTests extends ESTestCase {
             .put(Environment.PATH_SHARED_DATA_SETTING.getKey(), createTempDir().getParent())
             .put(Node.NODE_NAME_SETTING.getKey(), nodeName)
             .put(ScriptService.SCRIPT_MAX_COMPILATIONS_RATE.getKey(), "1000/1m")
-            .put(EsExecutors.PROCESSORS_SETTING.getKey(), 1) // limit the number of threads created
+            .put(EsExecutors.NODE_PROCESSORS_SETTING.getKey(), 1) // limit the number of threads created
             .put("transport.type", getTestTransportType())
             .put(Node.NODE_DATA_SETTING.getKey(), true)
             .put(NodeEnvironment.NODE_ID_SEED_SETTING.getKey(), random().nextLong())

+ 2 - 2
server/src/test/java/org/elasticsearch/nodesinfo/SimpleNodesInfoIT.java

@@ -114,8 +114,8 @@ public class SimpleNodesInfoIT extends ESIntegTestCase {
 
     public void testAllocatedProcessors() throws Exception {
         List<String> nodesIds = internalCluster().startNodes(
-                        Settings.builder().put(EsExecutors.PROCESSORS_SETTING.getKey(), 3).build(),
-                        Settings.builder().put(EsExecutors.PROCESSORS_SETTING.getKey(), 6).build()
+                        Settings.builder().put(EsExecutors.NODE_PROCESSORS_SETTING.getKey(), 3).build(),
+                        Settings.builder().put(EsExecutors.NODE_PROCESSORS_SETTING.getKey(), 6).build()
                 );
 
         final String node_1 = nodesIds.get(0);

+ 1 - 1
server/src/test/java/org/elasticsearch/threadpool/ScalingThreadPoolTests.java

@@ -54,7 +54,7 @@ public class ScalingThreadPoolTests extends ESThreadPoolTestCase {
         if (randomBoolean()) {
             processors = randomIntBetween(1, availableProcessors);
             maxBasedOnNumberOfProcessors = expectedSize(threadPoolName, processors);
-            builder.put("processors", processors);
+            builder.put("node.processors", processors);
         } else {
             maxBasedOnNumberOfProcessors = expectedSize(threadPoolName, availableProcessors);
             processors = availableProcessors;

+ 1 - 1
test/framework/src/main/java/org/elasticsearch/test/ESSingleNodeTestCase.java

@@ -197,7 +197,7 @@ public abstract class ESSingleNodeTestCase extends ESTestCase {
             .put(Environment.PATH_SHARED_DATA_SETTING.getKey(), createTempDir().getParent())
             .put(Node.NODE_NAME_SETTING.getKey(), nodeName)
             .put(ScriptService.SCRIPT_MAX_COMPILATIONS_RATE.getKey(), "1000/1m")
-            .put(EsExecutors.PROCESSORS_SETTING.getKey(), 1) // limit the number of threads created
+            .put(EsExecutors.NODE_PROCESSORS_SETTING.getKey(), 1) // limit the number of threads created
             .put("transport.type", getTestTransportType())
             .put(TransportSettings.PORT.getKey(), ESTestCase.getPortRange())
             .put(Node.NODE_DATA_SETTING.getKey(), true)

+ 3 - 1
test/framework/src/main/java/org/elasticsearch/test/InternalTestCluster.java

@@ -450,7 +450,9 @@ public final class InternalTestCluster extends TestCluster {
             builder.put(SearchService.DEFAULT_KEEPALIVE_SETTING.getKey(), timeValueSeconds(100 + random.nextInt(5 * 60)).getStringRep());
         }
 
-        builder.put(EsExecutors.PROCESSORS_SETTING.getKey(), 1 + random.nextInt(Math.min(4, Runtime.getRuntime().availableProcessors())));
+        builder.put(
+            EsExecutors.NODE_PROCESSORS_SETTING.getKey(),
+            1 + random.nextInt(Math.min(4, Runtime.getRuntime().availableProcessors())));
         if (random.nextBoolean()) {
             if (random.nextBoolean()) {
                 builder.put("indices.fielddata.cache.size", 1 + random.nextInt(1000), ByteSizeUnit.MB);