Browse Source

Add support for floating point node.processors setting (#89281)

This commit adds support for floating point node.processors setting.
This is useful when the nodes run in an environment where the CPU
time assigned to the ES node process is limited (i.e. using cgroups).
With this change, the system would be able to size the thread pools
accordingly, in this case it would round up the provided setting
to the closest integer.
Francisco Fernández Castaño 3 years ago
parent
commit
837a8d7a6e

+ 3 - 1
docs/reference/modules/threadpool.asciidoc

@@ -177,7 +177,9 @@ thread_pool:
 The number of processors is automatically detected, and the thread pool settings
 are automatically set based on it. In some cases it can be useful to override
 the number of detected processors. This can be done by explicitly setting the
-`node.processors` setting.
+`node.processors` setting. This setting accepts floating point numbers, this
+can be useful in environments where the Elasticsearch nodes are configured
+to run with CPU limits, such as cpu shares or quota under `Cgroups`.
 
 [source,yaml]
 --------------------------------------------------

+ 1 - 1
modules/transport-netty4/src/main/java/org/elasticsearch/http/netty4/Netty4HttpServerTransport.java

@@ -158,7 +158,7 @@ public class Netty4HttpServerTransport extends AbstractHttpServerTransport {
             clusterSettings,
             tracer
         );
-        Netty4Utils.setAvailableProcessors(EsExecutors.NODE_PROCESSORS_SETTING.get(settings));
+        Netty4Utils.setAvailableProcessors(EsExecutors.allocatedProcessors(settings));
         NettyAllocator.logAllocatorDescriptionIfNeeded();
         this.sharedGroupFactory = sharedGroupFactory;
 

+ 1 - 1
modules/transport-netty4/src/main/java/org/elasticsearch/transport/netty4/Netty4Transport.java

@@ -107,7 +107,7 @@ public class Netty4Transport extends TcpTransport {
         SharedGroupFactory sharedGroupFactory
     ) {
         super(settings, version, threadPool, pageCacheRecycler, circuitBreakerService, namedWriteableRegistry, networkService);
-        Netty4Utils.setAvailableProcessors(EsExecutors.NODE_PROCESSORS_SETTING.get(settings));
+        Netty4Utils.setAvailableProcessors(EsExecutors.allocatedProcessors(settings));
         NettyAllocator.logAllocatorDescriptionIfNeeded();
         this.sharedGroupFactory = sharedGroupFactory;
 

+ 1 - 1
modules/transport-netty4/src/main/java/org/elasticsearch/transport/netty4/Netty4Utils.java

@@ -114,7 +114,7 @@ public class Netty4Utils {
     public static Recycler<BytesRef> createRecycler(Settings settings) {
         // If this method is called by super ctor the processors will not be set. Accessing NettyAllocator initializes netty's internals
         // setting the processors. We must do it ourselves first just in case.
-        setAvailableProcessors(EsExecutors.NODE_PROCESSORS_SETTING.get(settings));
+        setAvailableProcessors(EsExecutors.allocatedProcessors(settings));
         return NettyAllocator.getRecycler();
     }
 }

+ 24 - 6
server/src/main/java/org/elasticsearch/common/util/concurrent/EsExecutors.java

@@ -37,13 +37,31 @@ public class EsExecutors {
     /**
      * Setting to manually control the number of allocated processors. This setting is used to adjust thread pool sizes per node. The
      * default value is {@link Runtime#availableProcessors()} but should be manually controlled if not all processors on the machine are
-     * available to Elasticsearch (e.g., because of CPU limits).
+     * available to Elasticsearch (e.g., because of CPU limits). Note that this setting accepts floating point processors.
+     * If a rounded number is needed, always use {@link EsExecutors#allocatedProcessors(Settings)}.
      */
-    public static final Setting<Integer> NODE_PROCESSORS_SETTING = Setting.intSetting(
+    public static final Setting<Double> NODE_PROCESSORS_SETTING = new Setting<>(
         "node.processors",
-        Runtime.getRuntime().availableProcessors(),
-        1,
-        Runtime.getRuntime().availableProcessors(),
+        Double.toString(Runtime.getRuntime().availableProcessors()),
+        textValue -> {
+            double numberOfProcessors = Double.parseDouble(textValue);
+            if (Double.isNaN(numberOfProcessors) || Double.isInfinite(numberOfProcessors)) {
+                String err = "Failed to parse value [" + textValue + "] for setting [node.processors]";
+                throw new IllegalArgumentException(err);
+            }
+
+            if (numberOfProcessors <= 0.0) {
+                String err = "Failed to parse value [" + textValue + "] for setting [node.processors] must be > 0";
+                throw new IllegalArgumentException(err);
+            }
+
+            final int maxNumberOfProcessors = Runtime.getRuntime().availableProcessors();
+            if (numberOfProcessors > maxNumberOfProcessors) {
+                String err = "Failed to parse value [" + textValue + "] for setting [node.processors] must be <= " + maxNumberOfProcessors;
+                throw new IllegalArgumentException(err);
+            }
+            return numberOfProcessors;
+        },
         Property.NodeScope
     );
 
@@ -55,7 +73,7 @@ public class EsExecutors {
      * @return the number of allocated processors
      */
     public static int allocatedProcessors(final Settings settings) {
-        return NODE_PROCESSORS_SETTING.get(settings);
+        return (int) Math.ceil(NODE_PROCESSORS_SETTING.get(settings));
     }
 
     public static PrioritizedEsThreadPoolExecutor newSinglePrioritizing(

+ 57 - 3
server/src/test/java/org/elasticsearch/common/util/concurrent/EsExecutorsTests.java

@@ -25,6 +25,7 @@ import static org.hamcrest.Matchers.containsString;
 import static org.hamcrest.Matchers.either;
 import static org.hamcrest.Matchers.equalTo;
 import static org.hamcrest.Matchers.hasToString;
+import static org.hamcrest.Matchers.is;
 import static org.hamcrest.Matchers.lessThan;
 
 /**
@@ -437,14 +438,14 @@ public class EsExecutorsTests extends ESTestCase {
     }
 
     public void testNodeProcessorsBound() {
-        final Setting<Integer> processorsSetting = EsExecutors.NODE_PROCESSORS_SETTING;
+        final Setting<Double> processorsSetting = EsExecutors.NODE_PROCESSORS_SETTING;
         final int available = Runtime.getRuntime().availableProcessors();
-        final int processors = randomIntBetween(available + 1, Integer.MAX_VALUE);
+        final double processors = randomDoubleBetween(available + Math.ulp(available), Float.MAX_VALUE, true);
         final Settings settings = Settings.builder().put(processorsSetting.getKey(), processors).build();
         final IllegalArgumentException e = expectThrows(IllegalArgumentException.class, () -> processorsSetting.get(settings));
         final String expected = String.format(
             Locale.ROOT,
-            "Failed to parse value [%d] for setting [%s] must be <= %d",
+            "Failed to parse value [%s] for setting [%s] must be <= %d",
             processors,
             processorsSetting.getKey(),
             available
@@ -452,4 +453,57 @@ public class EsExecutorsTests extends ESTestCase {
         assertThat(e, hasToString(containsString(expected)));
     }
 
+    public void testNodeProcessorsIsRoundedUpWhenUsingFloats() {
+        assertThat(
+            EsExecutors.allocatedProcessors(Settings.builder().put(EsExecutors.NODE_PROCESSORS_SETTING.getKey(), Double.MIN_VALUE).build()),
+            is(equalTo(1))
+        );
+
+        assertThat(
+            EsExecutors.allocatedProcessors(Settings.builder().put(EsExecutors.NODE_PROCESSORS_SETTING.getKey(), 0.2).build()),
+            is(equalTo(1))
+        );
+
+        assertThat(
+            EsExecutors.allocatedProcessors(Settings.builder().put(EsExecutors.NODE_PROCESSORS_SETTING.getKey(), 1.2).build()),
+            is(equalTo(2))
+        );
+
+        assertThat(
+            EsExecutors.allocatedProcessors(
+                Settings.builder().put(EsExecutors.NODE_PROCESSORS_SETTING.getKey(), Runtime.getRuntime().availableProcessors()).build()
+            ),
+            is(equalTo(Runtime.getRuntime().availableProcessors()))
+        );
+    }
+
+    public void testNodeProcessorsFloatValidation() {
+        final Setting<Double> processorsSetting = EsExecutors.NODE_PROCESSORS_SETTING;
+
+        {
+            final Settings settings = Settings.builder().put(processorsSetting.getKey(), 0.0).build();
+            expectThrows(IllegalArgumentException.class, () -> processorsSetting.get(settings));
+        }
+
+        {
+            final Settings settings = Settings.builder().put(processorsSetting.getKey(), Double.NaN).build();
+            expectThrows(IllegalArgumentException.class, () -> processorsSetting.get(settings));
+        }
+
+        {
+            final Settings settings = Settings.builder().put(processorsSetting.getKey(), Double.POSITIVE_INFINITY).build();
+            expectThrows(IllegalArgumentException.class, () -> processorsSetting.get(settings));
+        }
+
+        {
+            final Settings settings = Settings.builder().put(processorsSetting.getKey(), Double.NEGATIVE_INFINITY).build();
+            expectThrows(IllegalArgumentException.class, () -> processorsSetting.get(settings));
+        }
+
+        {
+            final Settings settings = Settings.builder().put(processorsSetting.getKey(), -1.5).build();
+            expectThrows(IllegalArgumentException.class, () -> processorsSetting.get(settings));
+        }
+    }
+
 }