|
@@ -157,15 +157,15 @@ public class ThreadPool implements Scheduler {
|
|
|
assert Node.NODE_NAME_SETTING.exists(settings);
|
|
|
|
|
|
final Map<String, ExecutorBuilder> builders = new HashMap<>();
|
|
|
- final int availableProcessors = EsExecutors.numberOfProcessors(settings);
|
|
|
- final int halfProcMaxAt5 = halfNumberOfProcessorsMaxFive(availableProcessors);
|
|
|
- final int halfProcMaxAt10 = halfNumberOfProcessorsMaxTen(availableProcessors);
|
|
|
- final int genericThreadPoolMax = boundedBy(4 * availableProcessors, 128, 512);
|
|
|
+ final int allocatedProcessors = EsExecutors.allocatedProcessors(settings);
|
|
|
+ final int halfProcMaxAt5 = halfAllocatedProcessorsMaxFive(allocatedProcessors);
|
|
|
+ final int halfProcMaxAt10 = halfAllocatedProcessorsMaxTen(allocatedProcessors);
|
|
|
+ final int genericThreadPoolMax = boundedBy(4 * allocatedProcessors, 128, 512);
|
|
|
builders.put(Names.GENERIC, new ScalingExecutorBuilder(Names.GENERIC, 4, genericThreadPoolMax, TimeValue.timeValueSeconds(30)));
|
|
|
- builders.put(Names.WRITE, new FixedExecutorBuilder(settings, Names.WRITE, availableProcessors, 200, false));
|
|
|
- builders.put(Names.GET, new FixedExecutorBuilder(settings, Names.GET, availableProcessors, 1000, false));
|
|
|
+ builders.put(Names.WRITE, new FixedExecutorBuilder(settings, Names.WRITE, allocatedProcessors, 200, false));
|
|
|
+ builders.put(Names.GET, new FixedExecutorBuilder(settings, Names.GET, allocatedProcessors, 1000, false));
|
|
|
builders.put(Names.ANALYZE, new FixedExecutorBuilder(settings, Names.ANALYZE, 1, 16, false));
|
|
|
- builders.put(Names.SEARCH, new FixedExecutorBuilder(settings, Names.SEARCH, searchThreadPoolSize(availableProcessors), 1000, true));
|
|
|
+ builders.put(Names.SEARCH, new FixedExecutorBuilder(settings, Names.SEARCH, searchThreadPoolSize(allocatedProcessors), 1000, true));
|
|
|
builders.put(Names.SEARCH_THROTTLED, new FixedExecutorBuilder(settings, Names.SEARCH_THROTTLED, 1, 100, true));
|
|
|
builders.put(Names.MANAGEMENT, new ScalingExecutorBuilder(Names.MANAGEMENT, 1, 5, TimeValue.timeValueMinutes(5)));
|
|
|
// no queue as this means clients will need to handle rejections on listener queue even if the operation succeeded
|
|
@@ -175,10 +175,10 @@ public class ThreadPool implements Scheduler {
|
|
|
builders.put(Names.WARMER, new ScalingExecutorBuilder(Names.WARMER, 1, halfProcMaxAt5, TimeValue.timeValueMinutes(5)));
|
|
|
builders.put(Names.SNAPSHOT, new ScalingExecutorBuilder(Names.SNAPSHOT, 1, halfProcMaxAt5, TimeValue.timeValueMinutes(5)));
|
|
|
builders.put(Names.FETCH_SHARD_STARTED,
|
|
|
- new ScalingExecutorBuilder(Names.FETCH_SHARD_STARTED, 1, 2 * availableProcessors, TimeValue.timeValueMinutes(5)));
|
|
|
+ new ScalingExecutorBuilder(Names.FETCH_SHARD_STARTED, 1, 2 * allocatedProcessors, TimeValue.timeValueMinutes(5)));
|
|
|
builders.put(Names.FORCE_MERGE, new FixedExecutorBuilder(settings, Names.FORCE_MERGE, 1, -1, false));
|
|
|
builders.put(Names.FETCH_SHARD_STORE,
|
|
|
- new ScalingExecutorBuilder(Names.FETCH_SHARD_STORE, 1, 2 * availableProcessors, TimeValue.timeValueMinutes(5)));
|
|
|
+ new ScalingExecutorBuilder(Names.FETCH_SHARD_STORE, 1, 2 * allocatedProcessors, TimeValue.timeValueMinutes(5)));
|
|
|
for (final ExecutorBuilder<?> builder : customBuilders) {
|
|
|
if (builders.containsKey(builder.name())) {
|
|
|
throw new IllegalArgumentException("builder with name [" + builder.name() + "] already exists");
|
|
@@ -424,20 +424,20 @@ public class ThreadPool implements Scheduler {
|
|
|
return Math.min(max, Math.max(min, value));
|
|
|
}
|
|
|
|
|
|
- static int halfNumberOfProcessorsMaxFive(int numberOfProcessors) {
|
|
|
- return boundedBy((numberOfProcessors + 1) / 2, 1, 5);
|
|
|
+ static int halfAllocatedProcessorsMaxFive(final int allocatedProcessors) {
|
|
|
+ return boundedBy((allocatedProcessors + 1) / 2, 1, 5);
|
|
|
}
|
|
|
|
|
|
- static int halfNumberOfProcessorsMaxTen(int numberOfProcessors) {
|
|
|
- return boundedBy((numberOfProcessors + 1) / 2, 1, 10);
|
|
|
+ static int halfAllocatedProcessorsMaxTen(final int allocatedProcessors) {
|
|
|
+ return boundedBy((allocatedProcessors + 1) / 2, 1, 10);
|
|
|
}
|
|
|
|
|
|
- static int twiceNumberOfProcessors(int numberOfProcessors) {
|
|
|
- return boundedBy(2 * numberOfProcessors, 2, Integer.MAX_VALUE);
|
|
|
+ static int twiceAllocatedProcessors(final int allocatedProcessors) {
|
|
|
+ return boundedBy(2 * allocatedProcessors, 2, Integer.MAX_VALUE);
|
|
|
}
|
|
|
|
|
|
- public static int searchThreadPoolSize(int availableProcessors) {
|
|
|
- return ((availableProcessors * 3) / 2) + 1;
|
|
|
+ public static int searchThreadPoolSize(final int allocatedProcessors) {
|
|
|
+ return ((allocatedProcessors * 3) / 2) + 1;
|
|
|
}
|
|
|
|
|
|
class ThreadedRunnable implements Runnable {
|