|
@@ -158,6 +158,7 @@ public class ScalingThreadPoolTests extends ESThreadPoolTestCase {
|
|
|
|
|
|
public void testScalingThreadPoolThreadsAreTerminatedAfterKeepAlive() throws InterruptedException {
|
|
|
final String threadPoolName = randomThreadPool(ThreadPool.ThreadPoolType.SCALING);
|
|
|
+ final int min = "generic".equals(threadPoolName) ? 4 : 1;
|
|
|
final Settings settings =
|
|
|
Settings.builder()
|
|
|
.put("threadpool." + threadPoolName + ".size", 128)
|
|
@@ -165,21 +166,33 @@ public class ScalingThreadPoolTests extends ESThreadPoolTestCase {
|
|
|
.build();
|
|
|
runScalingThreadPoolTest(settings, ((clusterSettings, threadPool) -> {
|
|
|
final CountDownLatch latch = new CountDownLatch(1);
|
|
|
+ final CountDownLatch taskLatch = new CountDownLatch(128);
|
|
|
for (int i = 0; i < 128; i++) {
|
|
|
threadPool.executor(threadPoolName).execute(() -> {
|
|
|
try {
|
|
|
latch.await();
|
|
|
+ taskLatch.countDown();
|
|
|
} catch (final InterruptedException e) {
|
|
|
throw new RuntimeException(e);
|
|
|
}
|
|
|
});
|
|
|
}
|
|
|
- final int active = stats(threadPool, threadPoolName).getThreads();
|
|
|
- assertThat(active, equalTo(128));
|
|
|
+ assertThat(stats(threadPool, threadPoolName).getThreads(), equalTo(128));
|
|
|
latch.countDown();
|
|
|
+ // this while loop is the core of this test; if threads
|
|
|
+ // are correctly idled down by the pool, the number of
|
|
|
+ // threads in the pool will drop to the min for the pool
|
|
|
+ // but if threads are not correctly idled down by the pool,
|
|
|
+ // this test will just timeout waiting for them to idle
|
|
|
+ // down
|
|
|
do {
|
|
|
spinForAtLeastOneMillisecond();
|
|
|
- } while (stats(threadPool, threadPoolName).getThreads() > 4);
|
|
|
+ } while (stats(threadPool, threadPoolName).getThreads() > min);
|
|
|
+ try {
|
|
|
+ taskLatch.await();
|
|
|
+ } catch (InterruptedException e) {
|
|
|
+ throw new RuntimeException(e);
|
|
|
+ }
|
|
|
assertThat(stats(threadPool, threadPoolName).getCompleted(), equalTo(128L));
|
|
|
}));
|
|
|
}
|