|
@@ -91,7 +91,6 @@ public class ThreadPool extends AbstractComponent implements Closeable {
|
|
|
}
|
|
|
|
|
|
public enum ThreadPoolType {
|
|
|
- CACHED("cached"),
|
|
|
DIRECT("direct"),
|
|
|
FIXED("fixed"),
|
|
|
SCALING("scaling");
|
|
@@ -125,12 +124,12 @@ public class ThreadPool extends AbstractComponent implements Closeable {
|
|
|
}
|
|
|
}
|
|
|
|
|
|
- public static Map<String, ThreadPoolType> THREAD_POOL_TYPES;
|
|
|
+ public static final Map<String, ThreadPoolType> THREAD_POOL_TYPES;
|
|
|
|
|
|
static {
|
|
|
HashMap<String, ThreadPoolType> map = new HashMap<>();
|
|
|
map.put(Names.SAME, ThreadPoolType.DIRECT);
|
|
|
- map.put(Names.GENERIC, ThreadPoolType.CACHED);
|
|
|
+ map.put(Names.GENERIC, ThreadPoolType.SCALING);
|
|
|
map.put(Names.LISTENER, ThreadPoolType.FIXED);
|
|
|
map.put(Names.GET, ThreadPoolType.FIXED);
|
|
|
map.put(Names.INDEX, ThreadPoolType.FIXED);
|
|
@@ -153,33 +152,67 @@ public class ThreadPool extends AbstractComponent implements Closeable {
|
|
|
executorSettings.put(name, settings);
|
|
|
}
|
|
|
|
|
|
- private static class ExecutorSettingsBuilder {
|
|
|
- Map<String, String> settings = new HashMap<>();
|
|
|
+ private static abstract class ExecutorSettingsBuilder<T extends ExecutorSettingsBuilder<T>> {
|
|
|
|
|
|
- public ExecutorSettingsBuilder(String name) {
|
|
|
- settings.put("name", name);
|
|
|
- settings.put("type", THREAD_POOL_TYPES.get(name).getType());
|
|
|
+ private final Settings.Builder builder;
|
|
|
+
|
|
|
+ protected ExecutorSettingsBuilder(String name, ThreadPoolType threadPoolType) {
|
|
|
+ if (THREAD_POOL_TYPES.get(name) != threadPoolType) {
|
|
|
+ throw new IllegalArgumentException("thread pool [" + name + "] must be of type [" + threadPoolType + "]");
|
|
|
+ }
|
|
|
+ builder = Settings.builder();
|
|
|
+ builder.put("name", name);
|
|
|
+ builder.put("type", threadPoolType.getType());
|
|
|
}
|
|
|
|
|
|
- public ExecutorSettingsBuilder size(int availableProcessors) {
|
|
|
- return add("size", Integer.toString(availableProcessors));
|
|
|
+ public T keepAlive(String keepAlive) {
|
|
|
+ return add("keep_alive", keepAlive);
|
|
|
}
|
|
|
|
|
|
- public ExecutorSettingsBuilder queueSize(int queueSize) {
|
|
|
- return add("queue_size", Integer.toString(queueSize));
|
|
|
+ public T queueSize(int queueSize) {
|
|
|
+ return add("queue_size", queueSize);
|
|
|
}
|
|
|
|
|
|
- public ExecutorSettingsBuilder keepAlive(String keepAlive) {
|
|
|
- return add("keep_alive", keepAlive);
|
|
|
+ protected T add(String setting, int value) {
|
|
|
+ return add(setting, Integer.toString(value));
|
|
|
+ }
|
|
|
+
|
|
|
+
|
|
|
+ protected T add(String setting, String value) {
|
|
|
+ builder.put(setting, value);
|
|
|
+ @SuppressWarnings("unchecked") final T executor = (T)this;
|
|
|
+ return executor;
|
|
|
+ }
|
|
|
+
|
|
|
+ public final Settings build() { return builder.build(); }
|
|
|
+
|
|
|
+ }
|
|
|
+
|
|
|
+ private static class FixedExecutorSettingsBuilder extends ExecutorSettingsBuilder<FixedExecutorSettingsBuilder> {
|
|
|
+
|
|
|
+ public FixedExecutorSettingsBuilder(String name) {
|
|
|
+ super(name, ThreadPoolType.FIXED);
|
|
|
}
|
|
|
|
|
|
- private ExecutorSettingsBuilder add(String key, String value) {
|
|
|
- settings.put(key, value);
|
|
|
- return this;
|
|
|
+ public FixedExecutorSettingsBuilder size(int size) {
|
|
|
+ return add("size", Integer.toString(size));
|
|
|
}
|
|
|
|
|
|
- public Settings build() {
|
|
|
- return Settings.builder().put(settings).build();
|
|
|
+ }
|
|
|
+
|
|
|
+ private static class ScalingExecutorSettingsBuilder extends ExecutorSettingsBuilder<ScalingExecutorSettingsBuilder> {
|
|
|
+
|
|
|
+ public ScalingExecutorSettingsBuilder(String name) {
|
|
|
+ super(name, ThreadPoolType.SCALING);
|
|
|
+ }
|
|
|
+
|
|
|
+ public ScalingExecutorSettingsBuilder min(int min) {
|
|
|
+ return add("min", min);
|
|
|
+ }
|
|
|
+
|
|
|
+
|
|
|
+ public ScalingExecutorSettingsBuilder size(int size) {
|
|
|
+ return add("size", size);
|
|
|
}
|
|
|
}
|
|
|
|
|
@@ -215,25 +248,26 @@ public class ThreadPool extends AbstractComponent implements Closeable {
|
|
|
validate(groupSettings);
|
|
|
|
|
|
int availableProcessors = EsExecutors.boundedNumberOfProcessors(settings);
|
|
|
- int halfProcMaxAt5 = Math.min(((availableProcessors + 1) / 2), 5);
|
|
|
- int halfProcMaxAt10 = Math.min(((availableProcessors + 1) / 2), 10);
|
|
|
+ int halfProcMaxAt5 = halfNumberOfProcessorsMaxFive(availableProcessors);
|
|
|
+ int halfProcMaxAt10 = halfNumberOfProcessorsMaxTen(availableProcessors);
|
|
|
Map<String, Settings> defaultExecutorTypeSettings = new HashMap<>();
|
|
|
- add(defaultExecutorTypeSettings, new ExecutorSettingsBuilder(Names.GENERIC).size(4 * availableProcessors).keepAlive("30s"));
|
|
|
- add(defaultExecutorTypeSettings, new ExecutorSettingsBuilder(Names.INDEX).size(availableProcessors).queueSize(200));
|
|
|
- add(defaultExecutorTypeSettings, new ExecutorSettingsBuilder(Names.BULK).size(availableProcessors).queueSize(50));
|
|
|
- add(defaultExecutorTypeSettings, new ExecutorSettingsBuilder(Names.GET).size(availableProcessors).queueSize(1000));
|
|
|
- add(defaultExecutorTypeSettings, new ExecutorSettingsBuilder(Names.SEARCH).size(((availableProcessors * 3) / 2) + 1).queueSize(1000));
|
|
|
- add(defaultExecutorTypeSettings, new ExecutorSettingsBuilder(Names.MANAGEMENT).size(5).keepAlive("5m"));
|
|
|
+ int genericThreadPoolMax = boundedBy(4 * availableProcessors, 128, 512);
|
|
|
+ add(defaultExecutorTypeSettings, new ScalingExecutorSettingsBuilder(Names.GENERIC).min(4).size(genericThreadPoolMax).keepAlive("30s"));
|
|
|
+ add(defaultExecutorTypeSettings, new FixedExecutorSettingsBuilder(Names.INDEX).size(availableProcessors).queueSize(200));
|
|
|
+ add(defaultExecutorTypeSettings, new FixedExecutorSettingsBuilder(Names.BULK).size(availableProcessors).queueSize(50));
|
|
|
+ add(defaultExecutorTypeSettings, new FixedExecutorSettingsBuilder(Names.GET).size(availableProcessors).queueSize(1000));
|
|
|
+ add(defaultExecutorTypeSettings, new FixedExecutorSettingsBuilder(Names.SEARCH).size(((availableProcessors * 3) / 2) + 1).queueSize(1000));
|
|
|
+ add(defaultExecutorTypeSettings, new ScalingExecutorSettingsBuilder(Names.MANAGEMENT).min(1).size(5).keepAlive("5m"));
|
|
|
// no queue as this means clients will need to handle rejections on listener queue even if the operation succeeded
|
|
|
// the assumption here is that the listeners should be very lightweight on the listeners side
|
|
|
- add(defaultExecutorTypeSettings, new ExecutorSettingsBuilder(Names.LISTENER).size(halfProcMaxAt10));
|
|
|
- add(defaultExecutorTypeSettings, new ExecutorSettingsBuilder(Names.FLUSH).size(halfProcMaxAt5).keepAlive("5m"));
|
|
|
- add(defaultExecutorTypeSettings, new ExecutorSettingsBuilder(Names.REFRESH).size(halfProcMaxAt10).keepAlive("5m"));
|
|
|
- add(defaultExecutorTypeSettings, new ExecutorSettingsBuilder(Names.WARMER).size(halfProcMaxAt5).keepAlive("5m"));
|
|
|
- add(defaultExecutorTypeSettings, new ExecutorSettingsBuilder(Names.SNAPSHOT).size(halfProcMaxAt5).keepAlive("5m"));
|
|
|
- add(defaultExecutorTypeSettings, new ExecutorSettingsBuilder(Names.FORCE_MERGE).size(1));
|
|
|
- add(defaultExecutorTypeSettings, new ExecutorSettingsBuilder(Names.FETCH_SHARD_STARTED).size(availableProcessors * 2).keepAlive("5m"));
|
|
|
- add(defaultExecutorTypeSettings, new ExecutorSettingsBuilder(Names.FETCH_SHARD_STORE).size(availableProcessors * 2).keepAlive("5m"));
|
|
|
+ add(defaultExecutorTypeSettings, new FixedExecutorSettingsBuilder(Names.LISTENER).size(halfProcMaxAt10));
|
|
|
+ add(defaultExecutorTypeSettings, new ScalingExecutorSettingsBuilder(Names.FLUSH).min(1).size(halfProcMaxAt5).keepAlive("5m"));
|
|
|
+ add(defaultExecutorTypeSettings, new ScalingExecutorSettingsBuilder(Names.REFRESH).min(1).size(halfProcMaxAt10).keepAlive("5m"));
|
|
|
+ add(defaultExecutorTypeSettings, new ScalingExecutorSettingsBuilder(Names.WARMER).min(1).size(halfProcMaxAt5).keepAlive("5m"));
|
|
|
+ add(defaultExecutorTypeSettings, new ScalingExecutorSettingsBuilder(Names.SNAPSHOT).min(1).size(halfProcMaxAt5).keepAlive("5m"));
|
|
|
+ add(defaultExecutorTypeSettings, new FixedExecutorSettingsBuilder(Names.FORCE_MERGE).size(1));
|
|
|
+ add(defaultExecutorTypeSettings, new ScalingExecutorSettingsBuilder(Names.FETCH_SHARD_STARTED).min(1).size(availableProcessors * 2).keepAlive("5m"));
|
|
|
+ add(defaultExecutorTypeSettings, new ScalingExecutorSettingsBuilder(Names.FETCH_SHARD_STORE).min(1).size(availableProcessors * 2).keepAlive("5m"));
|
|
|
|
|
|
this.defaultExecutorTypeSettings = unmodifiableMap(defaultExecutorTypeSettings);
|
|
|
|
|
@@ -251,9 +285,6 @@ public class ThreadPool extends AbstractComponent implements Closeable {
|
|
|
}
|
|
|
|
|
|
executors.put(Names.SAME, new ExecutorHolder(DIRECT_EXECUTOR, new Info(Names.SAME, ThreadPoolType.DIRECT)));
|
|
|
- if (!executors.get(Names.GENERIC).info.getThreadPoolType().equals(ThreadPoolType.CACHED)) {
|
|
|
- throw new IllegalArgumentException("generic thread pool must be of type cached");
|
|
|
- }
|
|
|
this.executors = unmodifiableMap(executors);
|
|
|
this.scheduler = new ScheduledThreadPoolExecutor(1, EsExecutors.daemonThreadFactory(settings, "scheduler"), new EsAbortPolicy());
|
|
|
this.scheduler.setExecuteExistingDelayedTasksAfterShutdownPolicy(false);
|
|
@@ -447,49 +478,23 @@ public class ThreadPool extends AbstractComponent implements Closeable {
|
|
|
ThreadFactory threadFactory = EsExecutors.daemonThreadFactory(this.settings, name);
|
|
|
if (ThreadPoolType.DIRECT == threadPoolType) {
|
|
|
if (previousExecutorHolder != null) {
|
|
|
- logger.debug("updating thread_pool [{}], type [{}]", name, type);
|
|
|
+ logger.debug("updating thread pool [{}], type [{}]", name, type);
|
|
|
} else {
|
|
|
- logger.debug("creating thread_pool [{}], type [{}]", name, type);
|
|
|
+ logger.debug("creating thread pool [{}], type [{}]", name, type);
|
|
|
}
|
|
|
return new ExecutorHolder(DIRECT_EXECUTOR, new Info(name, threadPoolType));
|
|
|
- } else if (ThreadPoolType.CACHED == threadPoolType) {
|
|
|
- if (!Names.GENERIC.equals(name)) {
|
|
|
- throw new IllegalArgumentException("thread pool type cached is reserved only for the generic thread pool and can not be applied to [" + name + "]");
|
|
|
- }
|
|
|
- TimeValue defaultKeepAlive = defaultSettings.getAsTime("keep_alive", timeValueMinutes(5));
|
|
|
- if (previousExecutorHolder != null) {
|
|
|
- if (ThreadPoolType.CACHED == previousInfo.getThreadPoolType()) {
|
|
|
- TimeValue updatedKeepAlive = settings.getAsTime("keep_alive", previousInfo.getKeepAlive());
|
|
|
- if (!previousInfo.getKeepAlive().equals(updatedKeepAlive)) {
|
|
|
- logger.debug("updating thread_pool [{}], type [{}], keep_alive [{}]", name, type, updatedKeepAlive);
|
|
|
- ((EsThreadPoolExecutor) previousExecutorHolder.executor()).setKeepAliveTime(updatedKeepAlive.millis(), TimeUnit.MILLISECONDS);
|
|
|
- return new ExecutorHolder(previousExecutorHolder.executor(), new Info(name, threadPoolType, -1, -1, updatedKeepAlive, null));
|
|
|
- }
|
|
|
- return previousExecutorHolder;
|
|
|
- }
|
|
|
- if (previousInfo.getKeepAlive() != null) {
|
|
|
- defaultKeepAlive = previousInfo.getKeepAlive();
|
|
|
- }
|
|
|
- }
|
|
|
- TimeValue keepAlive = settings.getAsTime("keep_alive", defaultKeepAlive);
|
|
|
- if (previousExecutorHolder != null) {
|
|
|
- logger.debug("updating thread_pool [{}], type [{}], keep_alive [{}]", name, type, keepAlive);
|
|
|
- } else {
|
|
|
- logger.debug("creating thread_pool [{}], type [{}], keep_alive [{}]", name, type, keepAlive);
|
|
|
- }
|
|
|
- Executor executor = EsExecutors.newCached(name, keepAlive.millis(), TimeUnit.MILLISECONDS, threadFactory, threadContext);
|
|
|
- return new ExecutorHolder(executor, new Info(name, threadPoolType, -1, -1, keepAlive, null));
|
|
|
} else if (ThreadPoolType.FIXED == threadPoolType) {
|
|
|
int defaultSize = defaultSettings.getAsInt("size", EsExecutors.boundedNumberOfProcessors(settings));
|
|
|
SizeValue defaultQueueSize = getAsSizeOrUnbounded(defaultSettings, "queue", getAsSizeOrUnbounded(defaultSettings, "queue_size", null));
|
|
|
|
|
|
if (previousExecutorHolder != null) {
|
|
|
+ assert previousInfo != null;
|
|
|
if (ThreadPoolType.FIXED == previousInfo.getThreadPoolType()) {
|
|
|
SizeValue updatedQueueSize = getAsSizeOrUnbounded(settings, "capacity", getAsSizeOrUnbounded(settings, "queue", getAsSizeOrUnbounded(settings, "queue_size", previousInfo.getQueueSize())));
|
|
|
if (Objects.equals(previousInfo.getQueueSize(), updatedQueueSize)) {
|
|
|
int updatedSize = applyHardSizeLimit(name, settings.getAsInt("size", previousInfo.getMax()));
|
|
|
if (previousInfo.getMax() != updatedSize) {
|
|
|
- logger.debug("updating thread_pool [{}], type [{}], size [{}], queue_size [{}]", name, type, updatedSize, updatedQueueSize);
|
|
|
+ logger.debug("updating thread pool [{}], type [{}], size [{}], queue_size [{}]", name, type, updatedSize, updatedQueueSize);
|
|
|
// if you think this code is crazy: that's because it is!
|
|
|
if (updatedSize > previousInfo.getMax()) {
|
|
|
((EsThreadPoolExecutor) previousExecutorHolder.executor()).setMaximumPoolSize(updatedSize);
|
|
@@ -511,20 +516,24 @@ public class ThreadPool extends AbstractComponent implements Closeable {
|
|
|
|
|
|
int size = applyHardSizeLimit(name, settings.getAsInt("size", defaultSize));
|
|
|
SizeValue queueSize = getAsSizeOrUnbounded(settings, "capacity", getAsSizeOrUnbounded(settings, "queue", getAsSizeOrUnbounded(settings, "queue_size", defaultQueueSize)));
|
|
|
- logger.debug("creating thread_pool [{}], type [{}], size [{}], queue_size [{}]", name, type, size, queueSize);
|
|
|
+ logger.debug("creating thread pool [{}], type [{}], size [{}], queue_size [{}]", name, type, size, queueSize);
|
|
|
Executor executor = EsExecutors.newFixed(name, size, queueSize == null ? -1 : (int) queueSize.singles(), threadFactory, threadContext);
|
|
|
return new ExecutorHolder(executor, new Info(name, threadPoolType, size, size, null, queueSize));
|
|
|
} else if (ThreadPoolType.SCALING == threadPoolType) {
|
|
|
TimeValue defaultKeepAlive = defaultSettings.getAsTime("keep_alive", timeValueMinutes(5));
|
|
|
int defaultMin = defaultSettings.getAsInt("min", 1);
|
|
|
int defaultSize = defaultSettings.getAsInt("size", EsExecutors.boundedNumberOfProcessors(settings));
|
|
|
+ final Integer queueSize = settings.getAsInt("queue_size", defaultSettings.getAsInt("queue_size", null));
|
|
|
+ if (queueSize != null) {
|
|
|
+ throw new IllegalArgumentException("thread pool [" + name + "] of type scaling can not have its queue re-sized but was [" + queueSize + "]");
|
|
|
+ }
|
|
|
if (previousExecutorHolder != null) {
|
|
|
if (ThreadPoolType.SCALING == previousInfo.getThreadPoolType()) {
|
|
|
TimeValue updatedKeepAlive = settings.getAsTime("keep_alive", previousInfo.getKeepAlive());
|
|
|
int updatedMin = settings.getAsInt("min", previousInfo.getMin());
|
|
|
int updatedSize = settings.getAsInt("max", settings.getAsInt("size", previousInfo.getMax()));
|
|
|
if (!previousInfo.getKeepAlive().equals(updatedKeepAlive) || previousInfo.getMin() != updatedMin || previousInfo.getMax() != updatedSize) {
|
|
|
- logger.debug("updating thread_pool [{}], type [{}], keep_alive [{}]", name, type, updatedKeepAlive);
|
|
|
+ logger.debug("updating thread pool [{}], type [{}], keep_alive [{}]", name, type, updatedKeepAlive);
|
|
|
if (!previousInfo.getKeepAlive().equals(updatedKeepAlive)) {
|
|
|
((EsThreadPoolExecutor) previousExecutorHolder.executor()).setKeepAliveTime(updatedKeepAlive.millis(), TimeUnit.MILLISECONDS);
|
|
|
}
|
|
@@ -552,9 +561,9 @@ public class ThreadPool extends AbstractComponent implements Closeable {
|
|
|
int min = settings.getAsInt("min", defaultMin);
|
|
|
int size = settings.getAsInt("max", settings.getAsInt("size", defaultSize));
|
|
|
if (previousExecutorHolder != null) {
|
|
|
- logger.debug("updating thread_pool [{}], type [{}], min [{}], size [{}], keep_alive [{}]", name, type, min, size, keepAlive);
|
|
|
+ logger.debug("updating thread pool [{}], type [{}], min [{}], size [{}], keep_alive [{}]", name, type, min, size, keepAlive);
|
|
|
} else {
|
|
|
- logger.debug("creating thread_pool [{}], type [{}], min [{}], size [{}], keep_alive [{}]", name, type, min, size, keepAlive);
|
|
|
+ logger.debug("creating thread pool [{}], type [{}], min [{}], size [{}], keep_alive [{}]", name, type, min, size, keepAlive);
|
|
|
}
|
|
|
Executor executor = EsExecutors.newScaling(name, min, size, keepAlive.millis(), TimeUnit.MILLISECONDS, threadFactory, threadContext);
|
|
|
return new ExecutorHolder(executor, new Info(name, threadPoolType, min, size, keepAlive, null));
|
|
@@ -577,6 +586,32 @@ public class ThreadPool extends AbstractComponent implements Closeable {
|
|
|
return size;
|
|
|
}
|
|
|
|
|
|
+ /**
|
|
|
+ * Constrains a value between minimum and maximum values
|
|
|
+ * (inclusive).
|
|
|
+ *
|
|
|
+ * @param value the value to constrain
|
|
|
+ * @param min the minimum acceptable value
|
|
|
+ * @param max the maximum acceptable value
|
|
|
+ * @return min if value is less than min, max if value is greater
|
|
|
+ * than value, otherwise value
|
|
|
+ */
|
|
|
+ static int boundedBy(int value, int min, int max) {
|
|
|
+ return Math.min(max, Math.max(min, value));
|
|
|
+ }
|
|
|
+
|
|
|
+ static int halfNumberOfProcessorsMaxFive(int numberOfProcessors) {
|
|
|
+ return boundedBy((numberOfProcessors + 1) / 2, 1, 5);
|
|
|
+ }
|
|
|
+
|
|
|
+ static int halfNumberOfProcessorsMaxTen(int numberOfProcessors) {
|
|
|
+ return boundedBy((numberOfProcessors + 1) / 2, 1, 10);
|
|
|
+ }
|
|
|
+
|
|
|
+ static int twiceNumberOfProcessors(int numberOfProcessors) {
|
|
|
+ return boundedBy(2 * numberOfProcessors, 2, Integer.MAX_VALUE);
|
|
|
+ }
|
|
|
+
|
|
|
private void updateSettings(Settings settings) {
|
|
|
Map<String, Settings> groupSettings = settings.getAsGroups();
|
|
|
if (groupSettings.isEmpty()) {
|
|
@@ -969,4 +1004,5 @@ public class ThreadPool extends AbstractComponent implements Closeable {
|
|
|
public ThreadContext getThreadContext() {
|
|
|
return threadContext;
|
|
|
}
|
|
|
+
|
|
|
}
|