|  | @@ -11,6 +11,8 @@ package org.elasticsearch.common.util.concurrent;
 | 
	
		
			
				|  |  |  
 | 
	
		
			
				|  |  |  import org.elasticsearch.action.ActionListener;
 | 
	
		
			
				|  |  |  import org.elasticsearch.action.ActionRunnable;
 | 
	
		
			
				|  |  | +import org.elasticsearch.action.support.SubscribableListener;
 | 
	
		
			
				|  |  | +import org.elasticsearch.common.Strings;
 | 
	
		
			
				|  |  |  import org.elasticsearch.common.settings.Setting;
 | 
	
		
			
				|  |  |  import org.elasticsearch.common.settings.Settings;
 | 
	
		
			
				|  |  |  import org.elasticsearch.common.unit.Processors;
 | 
	
	
		
			
				|  | @@ -25,9 +27,12 @@ import java.util.concurrent.CountDownLatch;
 | 
	
		
			
				|  |  |  import java.util.concurrent.CyclicBarrier;
 | 
	
		
			
				|  |  |  import java.util.concurrent.Executor;
 | 
	
		
			
				|  |  |  import java.util.concurrent.ExecutorService;
 | 
	
		
			
				|  |  | +import java.util.concurrent.Executors;
 | 
	
		
			
				|  |  | +import java.util.concurrent.Semaphore;
 | 
	
		
			
				|  |  |  import java.util.concurrent.ThreadFactory;
 | 
	
		
			
				|  |  |  import java.util.concurrent.ThreadPoolExecutor;
 | 
	
		
			
				|  |  |  import java.util.concurrent.TimeUnit;
 | 
	
		
			
				|  |  | +import java.util.concurrent.TimeoutException;
 | 
	
		
			
				|  |  |  import java.util.concurrent.atomic.AtomicBoolean;
 | 
	
		
			
				|  |  |  
 | 
	
		
			
				|  |  |  import static org.elasticsearch.common.util.concurrent.EsExecutors.TaskTrackingConfig.DEFAULT;
 | 
	
	
		
			
				|  | @@ -295,6 +300,7 @@ public class EsExecutorsTests extends ESTestCase {
 | 
	
		
			
				|  |  |              }
 | 
	
		
			
				|  |  |              try {
 | 
	
		
			
				|  |  |                  executor.execute(new Runnable() {
 | 
	
		
			
				|  |  | +
 | 
	
		
			
				|  |  |                      @Override
 | 
	
		
			
				|  |  |                      public void run() {
 | 
	
		
			
				|  |  |                          // Doesn't matter is going to be rejected
 | 
	
	
		
			
				|  | @@ -757,7 +763,7 @@ public class EsExecutorsTests extends ESTestCase {
 | 
	
		
			
				|  |  |      }
 | 
	
		
			
				|  |  |  
 | 
	
		
			
				|  |  |      public void testScalingWithEmptyCore() {
 | 
	
		
			
				|  |  | -        testScalingWithEmptyCore(
 | 
	
		
			
				|  |  | +        testScalingWithEmptyCoreAndMaxSingleThread(
 | 
	
		
			
				|  |  |              EsExecutors.newScaling(
 | 
	
		
			
				|  |  |                  getTestName(),
 | 
	
		
			
				|  |  |                  0,
 | 
	
	
		
			
				|  | @@ -772,7 +778,7 @@ public class EsExecutorsTests extends ESTestCase {
 | 
	
		
			
				|  |  |      }
 | 
	
		
			
				|  |  |  
 | 
	
		
			
				|  |  |      public void testScalingWithEmptyCoreAndKeepAlive() {
 | 
	
		
			
				|  |  | -        testScalingWithEmptyCore(
 | 
	
		
			
				|  |  | +        testScalingWithEmptyCoreAndMaxSingleThread(
 | 
	
		
			
				|  |  |              EsExecutors.newScaling(
 | 
	
		
			
				|  |  |                  getTestName(),
 | 
	
		
			
				|  |  |                  0,
 | 
	
	
		
			
				|  | @@ -787,9 +793,7 @@ public class EsExecutorsTests extends ESTestCase {
 | 
	
		
			
				|  |  |      }
 | 
	
		
			
				|  |  |  
 | 
	
		
			
				|  |  |      public void testScalingWithEmptyCoreAndLargerMaxSize() {
 | 
	
		
			
				|  |  | -        // TODO currently the reproduction of the starvation bug does not work if max pool size > 1
 | 
	
		
			
				|  |  | -        // https://github.com/elastic/elasticsearch/issues/124867
 | 
	
		
			
				|  |  | -        testScalingWithEmptyCore(
 | 
	
		
			
				|  |  | +        testScalingWithEmptyCoreAndMaxMultipleThreads(
 | 
	
		
			
				|  |  |              EsExecutors.newScaling(
 | 
	
		
			
				|  |  |                  getTestName(),
 | 
	
		
			
				|  |  |                  0,
 | 
	
	
		
			
				|  | @@ -804,9 +808,7 @@ public class EsExecutorsTests extends ESTestCase {
 | 
	
		
			
				|  |  |      }
 | 
	
		
			
				|  |  |  
 | 
	
		
			
				|  |  |      public void testScalingWithEmptyCoreAndKeepAliveAndLargerMaxSize() {
 | 
	
		
			
				|  |  | -        // TODO currently the reproduction of the starvation bug does not work if max pool size > 1
 | 
	
		
			
				|  |  | -        // https://github.com/elastic/elasticsearch/issues/124867
 | 
	
		
			
				|  |  | -        testScalingWithEmptyCore(
 | 
	
		
			
				|  |  | +        testScalingWithEmptyCoreAndMaxMultipleThreads(
 | 
	
		
			
				|  |  |              EsExecutors.newScaling(
 | 
	
		
			
				|  |  |                  getTestName(),
 | 
	
		
			
				|  |  |                  0,
 | 
	
	
		
			
				|  | @@ -821,10 +823,8 @@ public class EsExecutorsTests extends ESTestCase {
 | 
	
		
			
				|  |  |      }
 | 
	
		
			
				|  |  |  
 | 
	
		
			
				|  |  |      public void testScalingWithEmptyCoreAndWorkerPoolProbing() {
 | 
	
		
			
				|  |  | -        // https://github.com/elastic/elasticsearch/issues/124667 is difficult to reproduce if max pool size > 1.
 | 
	
		
			
				|  |  | -        // if probing mitigates the bug for max pool size = 1, we're good for larger pool sizes as well.
 | 
	
		
			
				|  |  |          // the executor is created directly here, newScaling doesn't use ExecutorScalingQueue & probing if max pool size = 1.
 | 
	
		
			
				|  |  | -        testScalingWithEmptyCore(
 | 
	
		
			
				|  |  | +        testScalingWithEmptyCoreAndMaxSingleThread(
 | 
	
		
			
				|  |  |              new EsThreadPoolExecutor(
 | 
	
		
			
				|  |  |                  getTestName(),
 | 
	
		
			
				|  |  |                  0,
 | 
	
	
		
			
				|  | @@ -840,10 +840,8 @@ public class EsExecutorsTests extends ESTestCase {
 | 
	
		
			
				|  |  |      }
 | 
	
		
			
				|  |  |  
 | 
	
		
			
				|  |  |      public void testScalingWithEmptyCoreAndKeepAliveAndWorkerPoolProbing() {
 | 
	
		
			
				|  |  | -        // https://github.com/elastic/elasticsearch/issues/124667 is difficult to reproduce if max pool size > 1.
 | 
	
		
			
				|  |  | -        // if probing mitigates the bug for max pool size = 1, we're good for larger pool sizes as well.
 | 
	
		
			
				|  |  |          // the executor is created directly here, newScaling doesn't use ExecutorScalingQueue & probing if max pool size = 1.
 | 
	
		
			
				|  |  | -        testScalingWithEmptyCore(
 | 
	
		
			
				|  |  | +        testScalingWithEmptyCoreAndMaxSingleThread(
 | 
	
		
			
				|  |  |              new EsThreadPoolExecutor(
 | 
	
		
			
				|  |  |                  getTestName(),
 | 
	
		
			
				|  |  |                  0,
 | 
	
	
		
			
				|  | @@ -858,11 +856,13 @@ public class EsExecutorsTests extends ESTestCase {
 | 
	
		
			
				|  |  |          );
 | 
	
		
			
				|  |  |      }
 | 
	
		
			
				|  |  |  
 | 
	
		
			
				|  |  | -    private void testScalingWithEmptyCore(EsThreadPoolExecutor executor) {
 | 
	
		
			
				|  |  | +    private void testScalingWithEmptyCoreAndMaxSingleThread(EsThreadPoolExecutor testSubject) {
 | 
	
		
			
				|  |  |          try {
 | 
	
		
			
				|  |  | +            final var keepAliveNanos = testSubject.getKeepAliveTime(TimeUnit.NANOSECONDS);
 | 
	
		
			
				|  |  | +
 | 
	
		
			
				|  |  |              class Task extends AbstractRunnable {
 | 
	
		
			
				|  |  | -                private int remaining;
 | 
	
		
			
				|  |  |                  private final CountDownLatch doneLatch;
 | 
	
		
			
				|  |  | +                private int remaining;
 | 
	
		
			
				|  |  |  
 | 
	
		
			
				|  |  |                  Task(int iterations, CountDownLatch doneLatch) {
 | 
	
		
			
				|  |  |                      this.remaining = iterations;
 | 
	
	
		
			
				|  | @@ -879,29 +879,108 @@ public class EsExecutorsTests extends ESTestCase {
 | 
	
		
			
				|  |  |                      if (--remaining == 0) {
 | 
	
		
			
				|  |  |                          doneLatch.countDown();
 | 
	
		
			
				|  |  |                      } else {
 | 
	
		
			
				|  |  | -                        logger.trace("--> remaining [{}]", remaining);
 | 
	
		
			
				|  |  | -                        final long keepAliveNanos = executor.getKeepAliveTime(TimeUnit.NANOSECONDS);
 | 
	
		
			
				|  |  |                          new Thread(() -> {
 | 
	
		
			
				|  |  |                              if (keepAliveNanos > 0) {
 | 
	
		
			
				|  |  | -                                final var targetNanoTime = System.nanoTime() + keepAliveNanos + between(-10_000, 10_000);
 | 
	
		
			
				|  |  | -                                while (System.nanoTime() < targetNanoTime) {
 | 
	
		
			
				|  |  | -                                    Thread.yield();
 | 
	
		
			
				|  |  | -                                }
 | 
	
		
			
				|  |  | +                                waitUntilKeepAliveTime(keepAliveNanos);
 | 
	
		
			
				|  |  |                              }
 | 
	
		
			
				|  |  | -                            executor.execute(Task.this);
 | 
	
		
			
				|  |  | +                            testSubject.execute(Task.this);
 | 
	
		
			
				|  |  |                          }).start();
 | 
	
		
			
				|  |  |                      }
 | 
	
		
			
				|  |  |                  }
 | 
	
		
			
				|  |  |              }
 | 
	
		
			
				|  |  |  
 | 
	
		
			
				|  |  |              for (int i = 0; i < 20; i++) {
 | 
	
		
			
				|  |  | -                logger.trace("--> attempt [{}]", i);
 | 
	
		
			
				|  |  |                  final var doneLatch = new CountDownLatch(1);
 | 
	
		
			
				|  |  | -                executor.execute(new Task(between(1, 500), doneLatch));
 | 
	
		
			
				|  |  | +                testSubject.execute(new Task(between(1, 500), doneLatch));
 | 
	
		
			
				|  |  |                  safeAwait(doneLatch, TimeValue.ONE_MINUTE);
 | 
	
		
			
				|  |  |              }
 | 
	
		
			
				|  |  |          } finally {
 | 
	
		
			
				|  |  | -            ThreadPool.terminate(executor, 1, TimeUnit.SECONDS);
 | 
	
		
			
				|  |  | +            ThreadPool.terminate(testSubject, 1, TimeUnit.SECONDS);
 | 
	
		
			
				|  |  | +        }
 | 
	
		
			
				|  |  | +    }
 | 
	
		
			
				|  |  | +
 | 
	
		
			
				|  |  | +    private void testScalingWithEmptyCoreAndMaxMultipleThreads(EsThreadPoolExecutor testSubject) {
 | 
	
		
			
				|  |  | +        final var keepAliveNanos = testSubject.getKeepAliveTime(TimeUnit.NANOSECONDS);
 | 
	
		
			
				|  |  | +        // Use max pool size with one additional scheduler task if a keep alive time is set.
 | 
	
		
			
				|  |  | +        final var schedulerTasks = testSubject.getMaximumPoolSize() + (keepAliveNanos > 0 ? 1 : 0);
 | 
	
		
			
				|  |  | +
 | 
	
		
			
				|  |  | +        class TaskScheduler {
 | 
	
		
			
				|  |  | +            final SubscribableListener<Void> result = new SubscribableListener<>();
 | 
	
		
			
				|  |  | +            final ExecutorService scheduler;
 | 
	
		
			
				|  |  | +            final CyclicBarrier cyclicBarrier;
 | 
	
		
			
				|  |  | +            final Semaphore taskCompletions;
 | 
	
		
			
				|  |  | +            private int remaining;
 | 
	
		
			
				|  |  | +
 | 
	
		
			
				|  |  | +            TaskScheduler(ExecutorService scheduler, int iterations) {
 | 
	
		
			
				|  |  | +                this.scheduler = scheduler;
 | 
	
		
			
				|  |  | +                this.taskCompletions = new Semaphore(0);
 | 
	
		
			
				|  |  | +                this.cyclicBarrier = new CyclicBarrier(schedulerTasks, () -> remaining--);
 | 
	
		
			
				|  |  | +                this.remaining = iterations;
 | 
	
		
			
				|  |  | +            }
 | 
	
		
			
				|  |  | +
 | 
	
		
			
				|  |  | +            public void start() {
 | 
	
		
			
				|  |  | +                // The scheduler tasks are running on the dedicated scheduler thread pool. Each task submits
 | 
	
		
			
				|  |  | +                // a test task on the EsThreadPoolExecutor (`testSubject`) releasing one `taskCompletions` permit.
 | 
	
		
			
				|  |  | +                final Runnable schedulerTask = () -> {
 | 
	
		
			
				|  |  | +                    try {
 | 
	
		
			
				|  |  | +                        while (remaining > 0) {
 | 
	
		
			
				|  |  | +                            // Wait for all scheduler threads to be ready for the next attempt.
 | 
	
		
			
				|  |  | +                            var first = cyclicBarrier.await(SAFE_AWAIT_TIMEOUT.millis(), TimeUnit.MILLISECONDS) == schedulerTasks - 1;
 | 
	
		
			
				|  |  | +                            if (first && keepAliveNanos > 0) {
 | 
	
		
			
				|  |  | +                                // The task submitted by the first scheduler task (after reaching the keep alive time) is the task
 | 
	
		
			
				|  |  | +                                // that might starve without any worker available unless an additional worker probe is submitted.
 | 
	
		
			
				|  |  | +                                waitUntilKeepAliveTime(keepAliveNanos);
 | 
	
		
			
				|  |  | +                            }
 | 
	
		
			
				|  |  | +                            // Test EsThreadPoolExecutor by submitting a task that releases one permit.
 | 
	
		
			
				|  |  | +                            testSubject.execute(taskCompletions::release);
 | 
	
		
			
				|  |  | +                            if (first) {
 | 
	
		
			
				|  |  | +                                // Let the first scheduler task (by arrival on the barrier) wait for all permits.
 | 
	
		
			
				|  |  | +                                var success = taskCompletions.tryAcquire(
 | 
	
		
			
				|  |  | +                                    schedulerTasks,
 | 
	
		
			
				|  |  | +                                    SAFE_AWAIT_TIMEOUT.millis(),
 | 
	
		
			
				|  |  | +                                    TimeUnit.MILLISECONDS
 | 
	
		
			
				|  |  | +                                );
 | 
	
		
			
				|  |  | +                                if (success == false) {
 | 
	
		
			
				|  |  | +                                    var msg = Strings.format(
 | 
	
		
			
				|  |  | +                                        "timed out waiting for [%s] of [%s] tasks to complete [queue size: %s, workers: %s] ",
 | 
	
		
			
				|  |  | +                                        schedulerTasks - taskCompletions.availablePermits(),
 | 
	
		
			
				|  |  | +                                        schedulerTasks,
 | 
	
		
			
				|  |  | +                                        testSubject.getQueue().size(),
 | 
	
		
			
				|  |  | +                                        testSubject.getPoolSize()
 | 
	
		
			
				|  |  | +                                    );
 | 
	
		
			
				|  |  | +                                    result.onFailure(new TimeoutException(msg));
 | 
	
		
			
				|  |  | +                                    return;
 | 
	
		
			
				|  |  | +                                }
 | 
	
		
			
				|  |  | +                            }
 | 
	
		
			
				|  |  | +                        }
 | 
	
		
			
				|  |  | +                    } catch (Exception e) {
 | 
	
		
			
				|  |  | +                        result.onFailure(e);
 | 
	
		
			
				|  |  | +                        return;
 | 
	
		
			
				|  |  | +                    }
 | 
	
		
			
				|  |  | +                    result.onResponse(null);
 | 
	
		
			
				|  |  | +                };
 | 
	
		
			
				|  |  | +                // Run scheduler tasks on the dedicated scheduler thread pool.
 | 
	
		
			
				|  |  | +                for (int i = 0; i < schedulerTasks; i++) {
 | 
	
		
			
				|  |  | +                    scheduler.execute(schedulerTask);
 | 
	
		
			
				|  |  | +                }
 | 
	
		
			
				|  |  | +            }
 | 
	
		
			
				|  |  | +        }
 | 
	
		
			
				|  |  | +
 | 
	
		
			
				|  |  | +        try (var scheduler = Executors.newFixedThreadPool(schedulerTasks)) {
 | 
	
		
			
				|  |  | +            for (int i = 0; i < 100; i++) {
 | 
	
		
			
				|  |  | +                TaskScheduler taskScheduler = new TaskScheduler(scheduler, between(10, 200));
 | 
	
		
			
				|  |  | +                taskScheduler.start();
 | 
	
		
			
				|  |  | +                safeAwait(taskScheduler.result);
 | 
	
		
			
				|  |  | +            }
 | 
	
		
			
				|  |  | +        } finally {
 | 
	
		
			
				|  |  | +            ThreadPool.terminate(testSubject, 1, TimeUnit.SECONDS);
 | 
	
		
			
				|  |  | +        }
 | 
	
		
			
				|  |  | +    }
 | 
	
		
			
				|  |  | +
 | 
	
		
			
				|  |  | +    private void waitUntilKeepAliveTime(long keepAliveNanos) {
 | 
	
		
			
				|  |  | +        var targetNanoTime = System.nanoTime() + keepAliveNanos + between(-1_000, 1_000);
 | 
	
		
			
				|  |  | +        while (System.nanoTime() < targetNanoTime) {
 | 
	
		
			
				|  |  | +            Thread.yield();
 | 
	
		
			
				|  |  |          }
 | 
	
		
			
				|  |  |      }
 | 
	
		
			
				|  |  |  }
 |