Browse Source

[Transform] implement throttling in indexer (#55011)

implement throttling in async-indexer used by rollup and transform. The added
docs_per_second parameter is used to calculate a delay before the next
search request is send. With re-throttle its possible to change the parameter
at runtime. When stopping a running job, its ensured that despite throttling
the indexer stops in reasonable time. This change contains the groundwork, but
does not expose the new functionality.

relates #54862
Hendrik Muhs 5 years ago
parent
commit
72a43dd538

+ 206 - 19
x-pack/plugin/core/src/main/java/org/elasticsearch/xpack/core/indexing/AsyncTwoPhaseIndexer.java

@@ -14,9 +14,13 @@ import org.elasticsearch.action.bulk.BulkResponse;
 import org.elasticsearch.action.index.IndexRequest;
 import org.elasticsearch.action.search.SearchRequest;
 import org.elasticsearch.action.search.SearchResponse;
+import org.elasticsearch.common.unit.TimeValue;
+import org.elasticsearch.common.util.concurrent.RunOnce;
+import org.elasticsearch.threadpool.Scheduler;
+import org.elasticsearch.threadpool.ThreadPool;
 
 import java.util.List;
-import java.util.concurrent.Executor;
+import java.util.concurrent.TimeUnit;
 import java.util.concurrent.atomic.AtomicBoolean;
 import java.util.concurrent.atomic.AtomicReference;
 
@@ -38,15 +42,70 @@ import java.util.concurrent.atomic.AtomicReference;
 public abstract class AsyncTwoPhaseIndexer<JobPosition, JobStats extends IndexerJobStats> {
     private static final Logger logger = LogManager.getLogger(AsyncTwoPhaseIndexer.class.getName());
 
+    // max time to wait for during throttling
+    private static final TimeValue MAX_THROTTLE_WAIT_TIME = TimeValue.timeValueHours(1);
+    // min time to trigger delayed execution, this avoids scheduling tasks with super short amount of time
+    private static final TimeValue MIN_THROTTLE_WAIT_TIME = TimeValue.timeValueMillis(10);
+
+    private final ActionListener<SearchResponse> searchResponseListener = ActionListener.wrap(
+        this::onSearchResponse,
+        this::finishWithSearchFailure
+    );
+
     private final JobStats stats;
 
     private final AtomicReference<IndexerState> state;
     private final AtomicReference<JobPosition> position;
-    private final Executor executor;
+    private final ThreadPool threadPool;
+    private final String executorName;
+
+    // throttling implementation
+    private volatile float currentMaxDocsPerSecond;
+    private volatile long lastSearchStartTimeNanos = 0;
+    private volatile long lastDocCount = 0;
+    private volatile ScheduledRunnable scheduledNextSearch;
+
+    /**
+     * Task wrapper for throttled execution, we need this wrapper in order to cancel and re-issue scheduled searches
+     */
+    class ScheduledRunnable {
+        private final ThreadPool threadPool;
+        private final String executorName;
+        private final Runnable command;
+        private Scheduler.ScheduledCancellable scheduled;
+
+        ScheduledRunnable(ThreadPool threadPool, String executorName, TimeValue delay, Runnable command) {
+            this.threadPool = threadPool;
+            this.executorName = executorName;
+
+            // with wrapping the command in RunOnce we ensure the command isn't executed twice, e.g. if the
+            // future is already running and cancel returns true
+            this.command = new RunOnce(command);
+            this.scheduled = threadPool.schedule(() -> { command.run(); }, delay, executorName);
+        }
 
-    protected AsyncTwoPhaseIndexer(Executor executor, AtomicReference<IndexerState> initialState,
-                                   JobPosition initialPosition, JobStats jobStats) {
-        this.executor = executor;
+        public void reschedule(TimeValue delay) {
+            // note: cancel return true if the runnable is currently executing
+            if (scheduled.cancel()) {
+                if (delay.duration() > 0) {
+                    scheduled = threadPool.schedule(() -> command.run(), delay, executorName);
+                } else {
+                    threadPool.executor(executorName).execute(() -> command.run());
+                }
+            }
+        }
+
+    }
+
+    protected AsyncTwoPhaseIndexer(
+        ThreadPool threadPool,
+        String executorName,
+        AtomicReference<IndexerState> initialState,
+        JobPosition initialPosition,
+        JobStats jobStats
+    ) {
+        this.threadPool = threadPool;
+        this.executorName = executorName;
         this.state = initialState;
         this.position = new AtomicReference<>(initialPosition);
         this.stats = jobStats;
@@ -96,7 +155,7 @@ public abstract class AsyncTwoPhaseIndexer<JobPosition, JobStats extends Indexer
      * @return The new state for the indexer (STOPPED, STOPPING or ABORTING if the job was already aborted).
      */
     public synchronized IndexerState stop() {
-        return state.updateAndGet(previousState -> {
+        IndexerState indexerState = state.updateAndGet(previousState -> {
             if (previousState == IndexerState.INDEXING) {
                 return IndexerState.STOPPING;
             } else if (previousState == IndexerState.STARTED) {
@@ -105,6 +164,13 @@ public abstract class AsyncTwoPhaseIndexer<JobPosition, JobStats extends Indexer
                 return previousState;
             }
         });
+
+        // a throttled search might be waiting to be executed, stop it
+        if (scheduledNextSearch != null) {
+            scheduledNextSearch.reschedule(TimeValue.ZERO);
+        }
+
+        return indexerState;
     }
 
     /**
@@ -152,11 +218,11 @@ public abstract class AsyncTwoPhaseIndexer<JobPosition, JobStats extends Indexer
 
             if (state.compareAndSet(IndexerState.STARTED, IndexerState.INDEXING)) {
                 // fire off the search. Note this is async, the method will return from here
-                executor.execute(() -> {
+                threadPool.executor(executorName).execute(() -> {
                     onStart(now, ActionListener.wrap(r -> {
                         assert r != null;
                         if (r) {
-                            nextSearch(ActionListener.wrap(this::onSearchResponse, this::finishWithSearchFailure));
+                            nextSearch();
                         } else {
                             onFinish(ActionListener.wrap(
                                 onFinishResponse -> doSaveState(finishAndSetState(), position.get(), () -> {}),
@@ -178,6 +244,34 @@ public abstract class AsyncTwoPhaseIndexer<JobPosition, JobStats extends Indexer
         }
     }
 
+    /**
+     * Re-schedules the search request if necessary, this method can be called to apply a change
+     * in maximumRequestsPerSecond immediately
+     */
+    protected void rethrottle() {
+        // simple check if the setting has changed, ignores the call if it hasn't
+        if (getMaxDocsPerSecond() == currentMaxDocsPerSecond) {
+            return;
+        }
+
+        reQueueThrottledSearch();
+    }
+
+    // protected, so it can be overwritten by tests
+    protected long getTimeNanos() {
+        return System.nanoTime();
+    }
+
+    /**
+     * Called to get max docs per second. To be overwritten if
+     * throttling is implemented, the default -1 turns off throttling.
+     *
+     * @return a float with max docs per second, -1 if throttling is off
+     */
+    protected float getMaxDocsPerSecond() {
+        return -1;
+    }
+
     /**
      * Called to get the Id of the job, used for logging.
      *
@@ -196,9 +290,13 @@ public abstract class AsyncTwoPhaseIndexer<JobPosition, JobStats extends Indexer
     /**
      * Called to build the next search request.
      *
+     * In case the indexer is throttled waitTimeInNanos can be used as hint for building a less resource hungry
+     * search request.
+     *
+     * @param waitTimeInNanos duration in nanoseconds the indexer has waited due to throttling.
      * @return SearchRequest to be passed to the search phase.
      */
-    protected abstract SearchRequest buildSearchRequest();
+    protected abstract SearchRequest buildSearchRequest(long waitTimeInNanos);
 
     /**
      * Called at startup after job has been triggered using {@link #maybeTriggerAsyncJob(long)} and the
@@ -349,10 +447,15 @@ public abstract class AsyncTwoPhaseIndexer<JobPosition, JobStats extends Indexer
             assert (searchResponse.getShardFailures().length == 0);
             stats.markStartProcessing();
             stats.incrementNumPages(1);
+
+            long numDocumentsBefore = stats.getNumDocuments();
             IterationResult<JobPosition> iterationResult = doProcess(searchResponse);
 
+            // record the number of documents returned to base throttling on the output
+            lastDocCount = stats.getNumDocuments() - numDocumentsBefore;
+
             if (iterationResult.isDone()) {
-                logger.debug("Finished indexing for job [" + getJobId() + "], saving state and shutting down.");
+                logger.debug("Finished indexing for job [{}], saving state and shutting down.", getJobId());
 
                 position.set(iterationResult.getPosition());
                 stats.markEndProcessing();
@@ -375,7 +478,7 @@ public abstract class AsyncTwoPhaseIndexer<JobPosition, JobStats extends Indexer
                     // TODO we should check items in the response and move after accordingly to
                     // resume the failing buckets ?
                     if (bulkResponse.hasFailures()) {
-                        logger.warn("Error while attempting to bulk index documents: " + bulkResponse.buildFailureMessage());
+                        logger.warn("Error while attempting to bulk index documents: {}", bulkResponse.buildFailureMessage());
                     }
                     stats.incrementNumOutputDocuments(bulkResponse.getItems().length);
 
@@ -396,8 +499,7 @@ public abstract class AsyncTwoPhaseIndexer<JobPosition, JobStats extends Indexer
                     JobPosition newPosition = iterationResult.getPosition();
                     position.set(newPosition);
 
-                    ActionListener<SearchResponse> listener = ActionListener.wrap(this::onSearchResponse, this::finishWithSearchFailure);
-                    nextSearch(listener);
+                    nextSearch();
                 } catch (Exception e) {
                     finishWithFailure(e);
                 }
@@ -409,26 +511,74 @@ public abstract class AsyncTwoPhaseIndexer<JobPosition, JobStats extends Indexer
 
     private void onBulkResponse(BulkResponse response, JobPosition position) {
         stats.markEndIndexing();
+
+        // check if we should stop
+        if (checkState(getState()) == false) {
+            return;
+        }
+
         try {
-            ActionListener<SearchResponse> listener = ActionListener.wrap(this::onSearchResponse, this::finishWithSearchFailure);
             // TODO probably something more intelligent than every-50 is needed
             if (stats.getNumPages() > 0 && stats.getNumPages() % 50 == 0) {
                 doSaveState(IndexerState.INDEXING, position, () -> {
-                    nextSearch(listener);
+                    nextSearch();
                 });
             } else {
-                nextSearch(listener);
+                nextSearch();
             }
         } catch (Exception e) {
             finishWithIndexingFailure(e);
         }
     }
 
-    private void nextSearch(ActionListener<SearchResponse> listener) {
+    protected void nextSearch() {
+        currentMaxDocsPerSecond = getMaxDocsPerSecond();
+        if (currentMaxDocsPerSecond > 0 && lastDocCount > 0) {
+            TimeValue executionDelay = calculateThrottlingDelay(
+                currentMaxDocsPerSecond,
+                lastDocCount,
+                lastSearchStartTimeNanos,
+                getTimeNanos()
+            );
+
+            if (executionDelay.duration() > 0) {
+                logger.debug(
+                    "throttling job [{}], wait for {} ({} {})",
+                    getJobId(),
+                    executionDelay,
+                    currentMaxDocsPerSecond,
+                    lastDocCount
+                );
+                scheduledNextSearch = new ScheduledRunnable(
+                    threadPool,
+                    executorName,
+                    executionDelay,
+                    () -> triggerNextSearch(executionDelay.getNanos())
+                );
+
+                // corner case: if for whatever reason stop() has been called meanwhile fast forward
+                if (getState().equals(IndexerState.STOPPING)) {
+                    scheduledNextSearch.reschedule(TimeValue.ZERO);
+                }
+                return;
+            }
+        }
+
+        triggerNextSearch(0L);
+    }
+
+    private void triggerNextSearch(long waitTimeInNanos) {
+        if (checkState(getState()) == false) {
+            return;
+        }
+
         stats.markStartSearch();
+        lastSearchStartTimeNanos = getTimeNanos();
+
         // ensure that partial results are not accepted and cause a search failure
-        SearchRequest searchRequest = buildSearchRequest().allowPartialSearchResults(false);
-        doNextSearch(searchRequest, listener);
+        SearchRequest searchRequest = buildSearchRequest(waitTimeInNanos).allowPartialSearchResults(false);
+
+        doNextSearch(searchRequest, searchResponseListener);
     }
 
     /**
@@ -461,4 +611,41 @@ public abstract class AsyncTwoPhaseIndexer<JobPosition, JobStats extends Indexer
         }
     }
 
+    private synchronized void reQueueThrottledSearch() {
+        currentMaxDocsPerSecond = getMaxDocsPerSecond();
+
+        if (scheduledNextSearch != null) {
+            TimeValue executionDelay = calculateThrottlingDelay(
+                currentMaxDocsPerSecond,
+                lastDocCount,
+                lastSearchStartTimeNanos,
+                getTimeNanos()
+            );
+
+            logger.trace(
+                "[{}] rethrottling job, wait {} until next search",
+                getJobId(),
+                executionDelay
+            );
+            scheduledNextSearch.reschedule(executionDelay);
+        }
+    }
+
+    static TimeValue calculateThrottlingDelay(float docsPerSecond, long docCount, long startTimeNanos, long now) {
+        if (docsPerSecond <= 0) {
+            return TimeValue.ZERO;
+        }
+        float timeToWaitNanos = (docCount / docsPerSecond) * TimeUnit.SECONDS.toNanos(1);
+
+        // from timeToWaitNanos - (now - startTimeNanos)
+        TimeValue executionDelay = TimeValue.timeValueNanos(
+            Math.min(MAX_THROTTLE_WAIT_TIME.getNanos(), Math.max(0, (long) timeToWaitNanos + startTimeNanos - now))
+        );
+
+        if (executionDelay.compareTo(MIN_THROTTLE_WAIT_TIME) < 0) {
+            return TimeValue.ZERO;
+        }
+        return executionDelay;
+    }
+
 }

+ 216 - 28
x-pack/plugin/core/src/test/java/org/elasticsearch/xpack/core/indexing/AsyncTwoPhaseIndexerTests.java

@@ -16,18 +16,23 @@ import org.elasticsearch.action.search.SearchRequest;
 import org.elasticsearch.action.search.SearchResponse;
 import org.elasticsearch.action.search.SearchResponseSections;
 import org.elasticsearch.action.search.ShardSearchFailure;
+import org.elasticsearch.common.settings.Settings;
+import org.elasticsearch.common.unit.TimeValue;
 import org.elasticsearch.common.xcontent.XContentBuilder;
 import org.elasticsearch.search.SearchHit;
 import org.elasticsearch.search.SearchHits;
 import org.elasticsearch.test.ESTestCase;
+import org.elasticsearch.threadpool.ExecutorBuilder;
+import org.elasticsearch.threadpool.TestThreadPool;
+import org.elasticsearch.threadpool.ThreadPool;
 import org.junit.Before;
 
 import java.io.IOException;
+import java.util.ArrayList;
+import java.util.Collection;
 import java.util.Collections;
+import java.util.List;
 import java.util.concurrent.CountDownLatch;
-import java.util.concurrent.Executor;
-import java.util.concurrent.ExecutorService;
-import java.util.concurrent.Executors;
 import java.util.concurrent.TimeUnit;
 import java.util.concurrent.atomic.AtomicBoolean;
 import java.util.concurrent.atomic.AtomicReference;
@@ -52,9 +57,9 @@ public class AsyncTwoPhaseIndexerTests extends ESTestCase {
         private volatile int step;
         private final boolean stoppedBeforeFinished;
 
-        protected MockIndexer(Executor executor, AtomicReference<IndexerState> initialState, Integer initialPosition,
-                              CountDownLatch latch, boolean stoppedBeforeFinished) {
-            super(executor, initialState, initialPosition, new MockJobStats());
+        protected MockIndexer(ThreadPool threadPool, String executorName, AtomicReference<IndexerState> initialState,
+                              Integer initialPosition, CountDownLatch latch, boolean stoppedBeforeFinished) {
+            super(threadPool, executorName, initialState, initialPosition, new MockJobStats());
             this.latch = latch;
             this.stoppedBeforeFinished = stoppedBeforeFinished;
         }
@@ -81,7 +86,7 @@ public class AsyncTwoPhaseIndexerTests extends ESTestCase {
         }
 
         @Override
-        protected SearchRequest buildSearchRequest() {
+        protected SearchRequest buildSearchRequest(long waitTimeInNanos) {
             assertThat(step, equalTo(1));
             ++step;
             return new SearchRequest();
@@ -114,8 +119,10 @@ public class AsyncTwoPhaseIndexerTests extends ESTestCase {
 
         @Override
         protected void doSaveState(IndexerState state, Integer position, Runnable next) {
-            int expectedStep = stoppedBeforeFinished ? 3 : 5;
-            assertThat(step, equalTo(expectedStep));
+            // for stop before finished we do not know if its stopped before are after the search
+            if (stoppedBeforeFinished == false) {
+                assertThat(step, equalTo(5));
+            }
             ++step;
             next.run();
         }
@@ -150,15 +157,29 @@ public class AsyncTwoPhaseIndexerTests extends ESTestCase {
 
     private class MockIndexerFiveRuns extends AsyncTwoPhaseIndexer<Integer, MockJobStats> {
 
+        private final long startTime;
+        private final CountDownLatch latch;
+        private volatile float maxDocsPerSecond;
+
         // counters
         private volatile boolean started = false;
+        private volatile boolean waitingForLatch = false;
         private volatile int searchRequests = 0;
         private volatile int searchOps = 0;
         private volatile int processOps = 0;
         private volatile int bulkOps = 0;
 
-        protected MockIndexerFiveRuns(Executor executor, AtomicReference<IndexerState> initialState, Integer initialPosition) {
-            super(executor, initialState, initialPosition, new MockJobStats());
+        protected MockIndexerFiveRuns(ThreadPool threadPool, String executorName, AtomicReference<IndexerState> initialState,
+                Integer initialPosition, float maxDocsPerSecond, CountDownLatch latch) {
+            super(threadPool, executorName, initialState, initialPosition, new MockJobStats());
+            startTime = System.nanoTime();
+            this.latch = latch;
+            this.maxDocsPerSecond = maxDocsPerSecond;
+        }
+
+        public void rethrottle(float maxDocsPerSecond) {
+            this.maxDocsPerSecond = maxDocsPerSecond;
+            rethrottle();
         }
 
         @Override
@@ -166,8 +187,16 @@ public class AsyncTwoPhaseIndexerTests extends ESTestCase {
             return "mock_5_runs";
         }
 
+        @Override
+        protected float getMaxDocsPerSecond() {
+            return maxDocsPerSecond;
+        }
+
         @Override
         protected IterationResult<Integer> doProcess(SearchResponse searchResponse) {
+            // increment doc count for throttling
+            getStats().incrementNumDocuments(1000);
+
             ++processOps;
             if (processOps == 5) {
                 return new IterationResult<>(Collections.singletonList(new IndexRequest()), processOps, true);
@@ -180,7 +209,7 @@ public class AsyncTwoPhaseIndexerTests extends ESTestCase {
         }
 
         @Override
-        protected SearchRequest buildSearchRequest() {
+        protected SearchRequest buildSearchRequest(long waitTimeInNanos) {
             ++searchRequests;
             return new SearchRequest();
         }
@@ -191,6 +220,23 @@ public class AsyncTwoPhaseIndexerTests extends ESTestCase {
             listener.onResponse(true);
         }
 
+        private void awaitForLatch() {
+            if (latch == null) {
+                return;
+            }
+            try {
+                waitingForLatch = true;
+                latch.await(10, TimeUnit.SECONDS);
+                waitingForLatch = false;
+            } catch (InterruptedException e) {
+                throw new RuntimeException(e);
+            }
+        }
+
+        public boolean waitingForLatchCountDown() {
+            return waitingForLatch;
+        }
+
         @Override
         protected void doNextSearch(SearchRequest request, ActionListener<SearchResponse> nextPhase) {
             ++searchOps;
@@ -198,6 +244,10 @@ public class AsyncTwoPhaseIndexerTests extends ESTestCase {
                 new SearchHits(new SearchHit[0], new TotalHits(0, TotalHits.Relation.EQUAL_TO), 0), null,
                 null, false, null, null, 1);
 
+            if (processOps == 3) {
+                awaitForLatch();
+            }
+
             nextPhase.onResponse(new SearchResponse(sections, null, 1, 1, 0, 0, ShardSearchFailure.EMPTY_ARRAY, null));
         }
 
@@ -232,6 +282,11 @@ public class AsyncTwoPhaseIndexerTests extends ESTestCase {
         protected void onAbort() {
         }
 
+        @Override
+        protected long getTimeNanos() {
+            return startTime + searchOps * 50_000_000L;
+        }
+
         public void assertCounters() {
             assertTrue(started);
             assertEquals(5L, searchRequests);
@@ -247,8 +302,9 @@ public class AsyncTwoPhaseIndexerTests extends ESTestCase {
         // test the execution order
         private int step;
 
-        protected MockIndexerThrowsFirstSearch(Executor executor, AtomicReference<IndexerState> initialState, Integer initialPosition) {
-            super(executor, initialState, initialPosition, new MockJobStats());
+        protected MockIndexerThrowsFirstSearch(ThreadPool threadPool, String executorName, AtomicReference<IndexerState> initialState,
+                                               Integer initialPosition) {
+            super(threadPool, executorName, initialState, initialPosition, new MockJobStats());
         }
 
         @Override
@@ -263,7 +319,7 @@ public class AsyncTwoPhaseIndexerTests extends ESTestCase {
         }
 
         @Override
-        protected SearchRequest buildSearchRequest() {
+        protected SearchRequest buildSearchRequest(long waitTimeInNanos) {
             assertThat(step, equalTo(1));
             ++step;
             return new SearchRequest();
@@ -321,12 +377,32 @@ public class AsyncTwoPhaseIndexerTests extends ESTestCase {
         }
     }
 
+    private class MockThreadPool extends TestThreadPool {
+
+        private List<TimeValue> delays = new ArrayList<>();
+
+        MockThreadPool(String name, ExecutorBuilder<?>... customBuilders) {
+            super(name, Settings.EMPTY, customBuilders);
+        }
+
+        @Override
+        public ScheduledCancellable schedule(Runnable command, TimeValue delay, String executor) {
+            delays.add(delay);
+
+            return super.schedule(command, TimeValue.ZERO, executor);
+        }
+
+        public void assertCountersAndDelay(Collection<TimeValue> expectedDelays) {
+            assertThat(delays, equalTo(expectedDelays));
+        }
+    }
+
     public void testStateMachine() throws Exception {
         AtomicReference<IndexerState> state = new AtomicReference<>(IndexerState.STOPPED);
-        final ExecutorService executor = Executors.newFixedThreadPool(1);
+        final ThreadPool threadPool = new TestThreadPool(getTestName());
         try {
             CountDownLatch countDownLatch = new CountDownLatch(1);
-            MockIndexer indexer = new MockIndexer(executor, state, 2, countDownLatch, false);
+            MockIndexer indexer = new MockIndexer(threadPool, ThreadPool.Names.GENERIC, state, 2, countDownLatch, false);
             indexer.start();
             assertThat(indexer.getState(), equalTo(IndexerState.STARTED));
             assertTrue(indexer.maybeTriggerAsyncJob(System.currentTimeMillis()));
@@ -344,33 +420,32 @@ public class AsyncTwoPhaseIndexerTests extends ESTestCase {
             assertThat(indexer.getStats().getOutputDocuments(), equalTo(0L));
             assertTrue(indexer.abort());
         } finally {
-            executor.shutdownNow();
+            ThreadPool.terminate(threadPool, 30, TimeUnit.SECONDS);
         }
     }
 
     public void testStateMachineBrokenSearch() throws Exception {
         AtomicReference<IndexerState> state = new AtomicReference<>(IndexerState.STOPPED);
-        final ExecutorService executor = Executors.newFixedThreadPool(1);
+        final ThreadPool threadPool = new TestThreadPool(getTestName());
 
         try {
-            MockIndexerThrowsFirstSearch indexer = new MockIndexerThrowsFirstSearch(executor, state, 2);
+            MockIndexerThrowsFirstSearch indexer = new MockIndexerThrowsFirstSearch(threadPool, ThreadPool.Names.GENERIC, state, 2);
             indexer.start();
-
             assertThat(indexer.getState(), equalTo(IndexerState.STARTED));
             assertTrue(indexer.maybeTriggerAsyncJob(System.currentTimeMillis()));
             assertBusy(() -> assertTrue(isFinished.get()), 10000, TimeUnit.SECONDS);
             assertThat(indexer.getStep(), equalTo(3));
         } finally {
-            executor.shutdownNow();
+            ThreadPool.terminate(threadPool, 30, TimeUnit.SECONDS);
         }
     }
 
     public void testStop_WhileIndexing() throws Exception {
         AtomicReference<IndexerState> state = new AtomicReference<>(IndexerState.STOPPED);
-        final ExecutorService executor = Executors.newFixedThreadPool(1);
+        final ThreadPool threadPool = new TestThreadPool(getTestName());
         try {
             CountDownLatch countDownLatch = new CountDownLatch(1);
-            MockIndexer indexer = new MockIndexer(executor, state, 2, countDownLatch, true);
+            MockIndexer indexer = new MockIndexer(threadPool, ThreadPool.Names.GENERIC, state, 2, countDownLatch, true);
             indexer.start();
             assertThat(indexer.getState(), equalTo(IndexerState.STARTED));
             assertTrue(indexer.maybeTriggerAsyncJob(System.currentTimeMillis()));
@@ -382,22 +457,135 @@ public class AsyncTwoPhaseIndexerTests extends ESTestCase {
             assertBusy(() -> assertTrue(isStopped.get()));
             assertFalse(isFinished.get());
         } finally {
-            executor.shutdownNow();
+            ThreadPool.terminate(threadPool, 30, TimeUnit.SECONDS);
         }
     }
 
     public void testFiveRuns() throws Exception {
+        doTestFiveRuns(-1, Collections.emptyList());
+    }
+
+    public void testFiveRunsThrottled100() throws Exception {
+        // expect throttling to kick in
+        doTestFiveRuns(100, timeValueCollectionFromMilliseconds(9950L, 9950L, 9950L, 9950L));
+    }
+
+    public void testFiveRunsThrottled1000() throws Exception {
+        // expect throttling to kick in
+        doTestFiveRuns(1_000, timeValueCollectionFromMilliseconds(950L, 950L, 950L, 950L));
+    }
+
+    public void testFiveRunsThrottled18000() throws Exception {
+        // expect throttling to not kick in due to min wait time
+        doTestFiveRuns(18_000, Collections.emptyList());
+    }
+
+    public void testFiveRunsThrottled1000000() throws Exception {
+        // docs per seconds is set high, so throttling does not kick in
+        doTestFiveRuns(1_000_000, Collections.emptyList());
+    }
+
+    public void doTestFiveRuns(float docsPerSecond, Collection<TimeValue> expectedDelays) throws Exception {
         AtomicReference<IndexerState> state = new AtomicReference<>(IndexerState.STOPPED);
-        final ExecutorService executor = Executors.newFixedThreadPool(1);
+        final MockThreadPool threadPool = new MockThreadPool(getTestName());
         try {
-            MockIndexerFiveRuns indexer = new MockIndexerFiveRuns (executor, state, 2);
+            MockIndexerFiveRuns indexer = new MockIndexerFiveRuns (threadPool, ThreadPool.Names.GENERIC, state, 2, docsPerSecond,
+                null);
             indexer.start();
             assertThat(indexer.getState(), equalTo(IndexerState.STARTED));
             assertTrue(indexer.maybeTriggerAsyncJob(System.currentTimeMillis()));
             assertBusy(() -> assertTrue(isFinished.get()));
             indexer.assertCounters();
+            threadPool.assertCountersAndDelay(expectedDelays);
         } finally {
-            executor.shutdownNow();
+            ThreadPool.terminate(threadPool, 30, TimeUnit.SECONDS);
         }
     }
+
+    public void testFiveRunsRethrottle0_100() throws Exception {
+        doTestFiveRunsRethrottle(-1, 100, timeValueCollectionFromMilliseconds(9950L));
+    }
+
+    public void testFiveRunsRethrottle100_0() throws Exception {
+        doTestFiveRunsRethrottle(100, 0, timeValueCollectionFromMilliseconds(9950L, 9950L, 9950L));
+    }
+
+    public void testFiveRunsRethrottle100_1000() throws Exception {
+        doTestFiveRunsRethrottle(100, 1000, timeValueCollectionFromMilliseconds(9950L, 9950L, 9950L, 950L));
+    }
+
+    public void testFiveRunsRethrottle1000_100() throws Exception {
+        doTestFiveRunsRethrottle(1000, 100, timeValueCollectionFromMilliseconds(950L, 950L, 950L, 9950L));
+    }
+
+    public void doTestFiveRunsRethrottle(
+        float docsPerSecond,
+        float docsPerSecondRethrottle,
+        Collection<TimeValue> expectedDelays
+    ) throws Exception {
+        AtomicReference<IndexerState> state = new AtomicReference<>(IndexerState.STOPPED);
+
+        final MockThreadPool threadPool = new MockThreadPool(getTestName());
+        try {
+            CountDownLatch latch = new CountDownLatch(1);
+            MockIndexerFiveRuns indexer = new MockIndexerFiveRuns (threadPool, ThreadPool.Names.GENERIC, state, 2, docsPerSecond,
+                latch);
+            indexer.start();
+            assertThat(indexer.getState(), equalTo(IndexerState.STARTED));
+            assertTrue(indexer.maybeTriggerAsyncJob(System.currentTimeMillis()));
+            // wait until the indexer starts waiting on the latch
+            assertBusy(() -> assertTrue(indexer.waitingForLatchCountDown()));
+            // rethrottle
+            indexer.rethrottle(docsPerSecondRethrottle);
+            latch.countDown();
+            // let it finish
+            assertBusy(() -> assertTrue(isFinished.get()));
+            indexer.assertCounters();
+            threadPool.assertCountersAndDelay(expectedDelays);
+        } finally {
+            ThreadPool.terminate(threadPool, 30, TimeUnit.SECONDS);
+        }
+    }
+
+    public void testCalculateThrottlingDelay() {
+        // negative docs per second, throttling turned off
+        assertThat(AsyncTwoPhaseIndexer.calculateThrottlingDelay(-100, 100, 1_000, 1_000), equalTo(TimeValue.ZERO));
+
+        // negative docs per second, throttling turned off
+        assertThat(AsyncTwoPhaseIndexer.calculateThrottlingDelay(0, 100, 1_000, 1_000), equalTo(TimeValue.ZERO));
+
+        // 100 docs/s with 100 docs -> 1s delay
+        assertThat(AsyncTwoPhaseIndexer.calculateThrottlingDelay(100, 100, 1_000_000, 1_000_000), equalTo(TimeValue.timeValueSeconds(1)));
+
+        // 100 docs/s with 100 docs, 200ms passed -> 800ms delay
+        assertThat(
+            AsyncTwoPhaseIndexer.calculateThrottlingDelay(100, 100, 1_000_000_000L, 1_200_000_000L),
+            equalTo(TimeValue.timeValueMillis(800))
+        );
+
+        // 100 docs/s with 100 docs done, time passed -> no delay
+        assertThat(AsyncTwoPhaseIndexer.calculateThrottlingDelay(100, 100, 1_000_000_000L, 5_000_000_000L), equalTo(TimeValue.ZERO));
+
+        // 1_000_000 docs/s with 1 doc done, time passed -> no delay
+        assertThat(AsyncTwoPhaseIndexer.calculateThrottlingDelay(1_000_000, 1, 1_000_000_000L, 1_000_000_000L), equalTo(TimeValue.ZERO));
+
+        // max: 1 docs/s with 1_000_000 docs done, time passed -> no delay
+        assertThat(
+            AsyncTwoPhaseIndexer.calculateThrottlingDelay(1, 1_000_000, 1_000_000_000L, 1_000_000_000L),
+            equalTo(TimeValue.timeValueHours(1))
+        );
+
+        // min: 100 docs/s with 100 docs, 995ms passed -> no delay, because minimum not reached
+        assertThat(AsyncTwoPhaseIndexer.calculateThrottlingDelay(100, 100, 1_000_000_000L, 1_995_000_000L), equalTo(TimeValue.ZERO));
+
+    }
+
+    private static Collection<TimeValue> timeValueCollectionFromMilliseconds(Long... milliseconds) {
+        List<TimeValue> timeValues = new ArrayList<>();
+        for (Long m: milliseconds) {
+            timeValues.add(TimeValue.timeValueMillis(m));
+        }
+
+        return timeValues;
+    }
 }

+ 11 - 8
x-pack/plugin/rollup/src/main/java/org/elasticsearch/xpack/rollup/job/RollupIndexer.java

@@ -29,6 +29,7 @@ import org.elasticsearch.search.aggregations.metrics.ValueCountAggregationBuilde
 import org.elasticsearch.search.aggregations.support.ValuesSource;
 import org.elasticsearch.search.aggregations.support.ValuesSourceAggregationBuilder;
 import org.elasticsearch.search.builder.SearchSourceBuilder;
+import org.elasticsearch.threadpool.ThreadPool;
 import org.elasticsearch.xpack.core.indexing.AsyncTwoPhaseIndexer;
 import org.elasticsearch.xpack.core.indexing.IndexerState;
 import org.elasticsearch.xpack.core.indexing.IterationResult;
@@ -48,7 +49,6 @@ import java.util.Collections;
 import java.util.HashMap;
 import java.util.List;
 import java.util.Map;
-import java.util.concurrent.Executor;
 import java.util.concurrent.atomic.AtomicReference;
 
 import static org.elasticsearch.xpack.core.rollup.RollupField.formatFieldName;
@@ -65,26 +65,29 @@ public abstract class RollupIndexer extends AsyncTwoPhaseIndexer<Map<String, Obj
 
     /**
      * Ctr
-     * @param executor Executor to use to fire the first request of a background job.
+     * @param threadPool ThreadPool to use to fire the first request of a background job.
+     * @param executorName Name of the executor to use to fire the first request of a background job.
      * @param job The rollup job
      * @param initialState Initial state for the indexer
      * @param initialPosition The last indexed bucket of the task
      */
-    RollupIndexer(Executor executor, RollupJob job, AtomicReference<IndexerState> initialState, Map<String, Object> initialPosition) {
-        this(executor, job, initialState, initialPosition, new RollupIndexerJobStats());
+    RollupIndexer(ThreadPool threadPool, String executorName, RollupJob job, AtomicReference<IndexerState> initialState,
+                  Map<String, Object> initialPosition) {
+        this(threadPool, executorName, job, initialState, initialPosition, new RollupIndexerJobStats());
     }
 
     /**
      * Ctr
-     * @param executor Executor to use to fire the first request of a background job.
+     * @param threadPool ThreadPool to use to fire the first request of a background job.
+     * @param executorName Name of the executor to use to fire the first request of a background job.
      * @param job The rollup job
      * @param initialState Initial state for the indexer
      * @param initialPosition The last indexed bucket of the task
      * @param jobStats jobstats instance for collecting stats
      */
-    RollupIndexer(Executor executor, RollupJob job, AtomicReference<IndexerState> initialState,
+    RollupIndexer(ThreadPool threadPool, String executorName, RollupJob job, AtomicReference<IndexerState> initialState,
                   Map<String, Object> initialPosition, RollupIndexerJobStats jobStats) {
-        super(executor, initialState, initialPosition, jobStats);
+        super(threadPool, executorName, initialState, initialPosition, jobStats);
         this.job = job;
         this.compositeBuilder = createCompositeBuilder(job.getConfig());
     }
@@ -110,7 +113,7 @@ public abstract class RollupIndexer extends AsyncTwoPhaseIndexer<Map<String, Obj
     }
 
     @Override
-    protected SearchRequest buildSearchRequest() {
+    protected SearchRequest buildSearchRequest(long waitTimeInNanos) {
         final Map<String, Object> position = getPosition();
         SearchSourceBuilder searchSource = new SearchSourceBuilder()
                 .size(0)

+ 1 - 1
x-pack/plugin/rollup/src/main/java/org/elasticsearch/xpack/rollup/job/RollupJobTask.java

@@ -102,7 +102,7 @@ public class RollupJobTask extends AllocatedPersistentTask implements SchedulerE
 
         ClientRollupPageManager(RollupJob job, IndexerState initialState, Map<String, Object> initialPosition,
                                 Client client) {
-            super(threadPool.executor(ThreadPool.Names.GENERIC), job, new AtomicReference<>(initialState), initialPosition);
+            super(threadPool, ThreadPool.Names.GENERIC, job, new AtomicReference<>(initialState), initialPosition);
             this.client = client;
             this.job = job;
         }

+ 8 - 8
x-pack/plugin/rollup/src/test/java/org/elasticsearch/xpack/rollup/job/RollupIndexerIndexingTests.java

@@ -47,6 +47,8 @@ import org.elasticsearch.search.aggregations.AggregatorTestCase;
 import org.elasticsearch.search.aggregations.bucket.composite.CompositeAggregation;
 import org.elasticsearch.search.aggregations.bucket.composite.CompositeAggregationBuilder;
 import org.elasticsearch.search.aggregations.bucket.histogram.DateHistogramInterval;
+import org.elasticsearch.threadpool.TestThreadPool;
+import org.elasticsearch.threadpool.ThreadPool;
 import org.elasticsearch.xpack.core.indexing.IndexerState;
 import org.elasticsearch.xpack.core.rollup.ConfigTestHelpers;
 import org.elasticsearch.xpack.core.rollup.job.DateHistogramGroupConfig;
@@ -71,9 +73,6 @@ import java.util.List;
 import java.util.Locale;
 import java.util.Map;
 import java.util.concurrent.CountDownLatch;
-import java.util.concurrent.Executor;
-import java.util.concurrent.ExecutorService;
-import java.util.concurrent.Executors;
 import java.util.concurrent.TimeUnit;
 import java.util.concurrent.atomic.AtomicReference;
 import java.util.function.Consumer;
@@ -508,14 +507,15 @@ public class RollupIndexerIndexingTests extends AggregatorTestCase {
         IndexReader reader = DirectoryReader.open(dir);
         IndexSearcher searcher = new IndexSearcher(reader);
         String dateHistoField = config.getGroupConfig().getDateHistogram().getField();
-        final ExecutorService executor = Executors.newFixedThreadPool(1);
+        final ThreadPool threadPool = new TestThreadPool(getTestName());
+
         try {
             RollupJob job = new RollupJob(config, Collections.emptyMap());
-            final SyncRollupIndexer action = new SyncRollupIndexer(executor, job, searcher,
+            final SyncRollupIndexer action = new SyncRollupIndexer(threadPool, ThreadPool.Names.GENERIC, job, searcher,
                     fieldTypeLookup.values().toArray(new MappedFieldType[0]), fieldTypeLookup.get(dateHistoField));
             rollupConsumer.accept(action.triggerAndWaitForCompletion(now));
         } finally {
-            executor.shutdownNow();
+            ThreadPool.terminate(threadPool, 30, TimeUnit.SECONDS);
             reader.close();
             dir.close();
         }
@@ -611,9 +611,9 @@ public class RollupIndexerIndexingTests extends AggregatorTestCase {
         private final CountDownLatch latch = new CountDownLatch(1);
         private Exception exc;
 
-        SyncRollupIndexer(Executor executor, RollupJob job, IndexSearcher searcher,
+        SyncRollupIndexer(ThreadPool threadPool, String executorName, RollupJob job, IndexSearcher searcher,
                           MappedFieldType[] fieldTypes, MappedFieldType timestampField) {
-            super(executor, job, new AtomicReference<>(IndexerState.STARTED), null);
+            super(threadPool, executorName, job, new AtomicReference<>(IndexerState.STARTED), null);
             this.searcher = searcher;
             this.fieldTypes = fieldTypes;
             this.timestampField = timestampField;

+ 55 - 54
x-pack/plugin/rollup/src/test/java/org/elasticsearch/xpack/rollup/job/RollupIndexerStateTests.java

@@ -22,6 +22,8 @@ import org.elasticsearch.search.aggregations.Aggregations;
 import org.elasticsearch.search.aggregations.InternalAggregations;
 import org.elasticsearch.search.aggregations.bucket.composite.CompositeAggregation;
 import org.elasticsearch.test.ESTestCase;
+import org.elasticsearch.threadpool.TestThreadPool;
+import org.elasticsearch.threadpool.ThreadPool;
 import org.elasticsearch.xpack.core.indexing.IndexerState;
 import org.elasticsearch.xpack.core.rollup.ConfigTestHelpers;
 import org.elasticsearch.xpack.core.rollup.RollupField;
@@ -35,9 +37,7 @@ import java.util.Collections;
 import java.util.List;
 import java.util.Map;
 import java.util.concurrent.CountDownLatch;
-import java.util.concurrent.Executor;
-import java.util.concurrent.ExecutorService;
-import java.util.concurrent.Executors;
+import java.util.concurrent.TimeUnit;
 import java.util.concurrent.atomic.AtomicBoolean;
 import java.util.concurrent.atomic.AtomicReference;
 import java.util.function.BiConsumer;
@@ -52,14 +52,14 @@ import static org.mockito.Mockito.spy;
 
 public class RollupIndexerStateTests extends ESTestCase {
     private static class EmptyRollupIndexer extends RollupIndexer {
-        EmptyRollupIndexer(Executor executor, RollupJob job, AtomicReference<IndexerState> initialState,
+        EmptyRollupIndexer(ThreadPool threadPool, String executorName, RollupJob job, AtomicReference<IndexerState> initialState,
                 Map<String, Object> initialPosition, RollupIndexerJobStats stats) {
-            super(executor, job, initialState, initialPosition, stats);
+            super(threadPool, executorName, job, initialState, initialPosition, stats);
         }
 
-        EmptyRollupIndexer(Executor executor, RollupJob job, AtomicReference<IndexerState> initialState,
+        EmptyRollupIndexer(ThreadPool threadPool, String executorName, RollupJob job, AtomicReference<IndexerState> initialState,
                            Map<String, Object> initialPosition) {
-            super(executor, job, initialState, initialPosition);
+            super(threadPool, executorName, job, initialState, initialPosition);
         }
 
         @Override
@@ -134,14 +134,14 @@ public class RollupIndexerStateTests extends ESTestCase {
     private static class DelayedEmptyRollupIndexer extends EmptyRollupIndexer {
         protected CountDownLatch latch;
 
-        DelayedEmptyRollupIndexer(Executor executor, RollupJob job, AtomicReference<IndexerState> initialState,
+        DelayedEmptyRollupIndexer(ThreadPool threadPool, String executorName, RollupJob job, AtomicReference<IndexerState> initialState,
                                   Map<String, Object> initialPosition) {
-            super(executor, job, initialState, initialPosition);
+            super(threadPool, executorName, job, initialState, initialPosition);
         }
 
-        DelayedEmptyRollupIndexer(Executor executor, RollupJob job, AtomicReference<IndexerState> initialState,
+        DelayedEmptyRollupIndexer(ThreadPool threadPool, String executorName, RollupJob job, AtomicReference<IndexerState> initialState,
                 Map<String, Object> initialPosition, RollupIndexerJobStats stats) {
-            super(executor, job, initialState, initialPosition, stats);
+            super(threadPool, executorName, job, initialState, initialPosition, stats);
         }
 
         private CountDownLatch newLatch() {
@@ -167,17 +167,17 @@ public class RollupIndexerStateTests extends ESTestCase {
         final BiConsumer<IndexerState, Map<String, Object>> saveStateCheck;
         private CountDownLatch latch;
 
-        NonEmptyRollupIndexer(Executor executor, RollupJob job, AtomicReference<IndexerState> initialState,
+        NonEmptyRollupIndexer(ThreadPool threadPool, String executorName, RollupJob job, AtomicReference<IndexerState> initialState,
                               Map<String, Object> initialPosition, Function<SearchRequest, SearchResponse> searchFunction,
                               Function<BulkRequest, BulkResponse> bulkFunction, Consumer<Exception> failureConsumer) {
-            this(executor, job, initialState, initialPosition, searchFunction, bulkFunction, failureConsumer, (i, m) -> {});
+            this(threadPool, executorName, job, initialState, initialPosition, searchFunction, bulkFunction, failureConsumer, (i, m) -> {});
         }
 
-        NonEmptyRollupIndexer(Executor executor, RollupJob job, AtomicReference<IndexerState> initialState,
+        NonEmptyRollupIndexer(ThreadPool threadPool, String executorName, RollupJob job, AtomicReference<IndexerState> initialState,
                               Map<String, Object> initialPosition, Function<SearchRequest, SearchResponse> searchFunction,
                               Function<BulkRequest, BulkResponse> bulkFunction, Consumer<Exception> failureConsumer,
                               BiConsumer<IndexerState, Map<String, Object>> saveStateCheck) {
-            super(executor, job, initialState, initialPosition);
+            super(threadPool, executorName, job, initialState, initialPosition);
             this.searchFunction = searchFunction;
             this.bulkFunction = bulkFunction;
             this.failureConsumer = failureConsumer;
@@ -242,9 +242,9 @@ public class RollupIndexerStateTests extends ESTestCase {
     public void testStarted() throws Exception {
         RollupJob job = new RollupJob(ConfigTestHelpers.randomRollupJobConfig(random()), Collections.emptyMap());
         AtomicReference<IndexerState> state = new AtomicReference<>(IndexerState.STOPPED);
-        final ExecutorService executor = Executors.newFixedThreadPool(1);
+        final ThreadPool threadPool = new TestThreadPool(getTestName());
         try {
-            RollupIndexer indexer = new EmptyRollupIndexer(executor, job, state, null);
+            RollupIndexer indexer = new EmptyRollupIndexer(threadPool, ThreadPool.Names.GENERIC, job, state, null);
             indexer.start();
             assertThat(indexer.getState(), equalTo(IndexerState.STARTED));
             assertTrue(indexer.maybeTriggerAsyncJob(System.currentTimeMillis()));
@@ -257,17 +257,17 @@ public class RollupIndexerStateTests extends ESTestCase {
             assertThat(indexer.getStats().getIndexTotal(), equalTo(0L));
             assertTrue(indexer.abort());
         } finally {
-            executor.shutdownNow();
+            ThreadPool.terminate(threadPool, 30, TimeUnit.SECONDS);
         }
     }
 
     public void testIndexing() throws Exception {
         RollupJob job = new RollupJob(ConfigTestHelpers.randomRollupJobConfig(random()), Collections.emptyMap());
         AtomicReference<IndexerState> state = new AtomicReference<>(IndexerState.STOPPED);
-        final ExecutorService executor = Executors.newFixedThreadPool(1);
+        final ThreadPool threadPool = new TestThreadPool(getTestName());
         try {
             AtomicBoolean isFinished = new AtomicBoolean(false);
-            DelayedEmptyRollupIndexer indexer = new DelayedEmptyRollupIndexer(executor, job, state, null) {
+            DelayedEmptyRollupIndexer indexer = new DelayedEmptyRollupIndexer(threadPool, ThreadPool.Names.GENERIC, job, state, null) {
                 @Override
                 protected void onFinish(ActionListener<Void> listener) {
                     super.onFinish(ActionListener.wrap(r -> {
@@ -300,7 +300,7 @@ public class RollupIndexerStateTests extends ESTestCase {
             assertThat(indexer.getStats().getSearchTotal(), equalTo(1L));
             assertTrue(indexer.abort());
         } finally {
-            executor.shutdownNow();
+            ThreadPool.terminate(threadPool, 30, TimeUnit.SECONDS);
         }
     }
 
@@ -322,10 +322,11 @@ public class RollupIndexerStateTests extends ESTestCase {
         doAnswer(forwardAndChangeState).when(spyStats).incrementNumInvocations(1L);
         RollupJob job = new RollupJob(config, Collections.emptyMap());
 
-        final ExecutorService executor = Executors.newFixedThreadPool(1);
+        final ThreadPool threadPool = new TestThreadPool(getTestName());
         try {
             AtomicBoolean isFinished = new AtomicBoolean(false);
-            DelayedEmptyRollupIndexer indexer = new DelayedEmptyRollupIndexer(executor, job, state, null, spyStats) {
+            DelayedEmptyRollupIndexer indexer =
+                new DelayedEmptyRollupIndexer(threadPool, ThreadPool.Names.GENERIC, job, state, null, spyStats) {
                 @Override
                 protected void onFinish(ActionListener<Void> listener) {
                     super.onFinish(ActionListener.wrap(r -> {
@@ -345,7 +346,7 @@ public class RollupIndexerStateTests extends ESTestCase {
             assertThat(indexer.getStats().getNumPages(), equalTo(0L));
             assertTrue(indexer.abort());
         } finally {
-            executor.shutdownNow();
+            ThreadPool.terminate(threadPool, 30, TimeUnit.SECONDS);
         }
     }
 
@@ -353,10 +354,10 @@ public class RollupIndexerStateTests extends ESTestCase {
         final AtomicBoolean aborted = new AtomicBoolean(false);
         RollupJob job = new RollupJob(ConfigTestHelpers.randomRollupJobConfig(random()), Collections.emptyMap());
         AtomicReference<IndexerState> state = new AtomicReference<>(IndexerState.STOPPED);
-        final ExecutorService executor = Executors.newFixedThreadPool(1);
+        final ThreadPool threadPool = new TestThreadPool(getTestName());
         final CountDownLatch latch = new CountDownLatch(1);
         try {
-            EmptyRollupIndexer indexer = new EmptyRollupIndexer(executor, job, state, null) {
+            EmptyRollupIndexer indexer = new EmptyRollupIndexer(threadPool, ThreadPool.Names.GENERIC, job, state, null) {
                 @Override
                 protected void onFinish(ActionListener<Void> listener) {
                     fail("Should not have called onFinish");
@@ -390,7 +391,7 @@ public class RollupIndexerStateTests extends ESTestCase {
             assertThat(indexer.getStats().getNumPages(), equalTo(0L));
             assertThat(indexer.getStats().getSearchFailures(), equalTo(0L));
         } finally {
-            executor.shutdownNow();
+            ThreadPool.terminate(threadPool, 30, TimeUnit.SECONDS);
         }
     }
 
@@ -398,13 +399,13 @@ public class RollupIndexerStateTests extends ESTestCase {
         final AtomicBoolean aborted = new AtomicBoolean(false);
         RollupJob job = new RollupJob(ConfigTestHelpers.randomRollupJobConfig(random()), Collections.emptyMap());
         AtomicReference<IndexerState> state = new AtomicReference<>(IndexerState.STOPPED);
-        final ExecutorService executor = Executors.newFixedThreadPool(1);
+        final ThreadPool threadPool = new TestThreadPool(getTestName());
 
         // Don't use the indexer's latch because we completely change doNextSearch()
         final CountDownLatch doNextSearchLatch = new CountDownLatch(1);
 
         try {
-            DelayedEmptyRollupIndexer indexer = new DelayedEmptyRollupIndexer(executor, job, state, null) {
+            DelayedEmptyRollupIndexer indexer = new DelayedEmptyRollupIndexer(threadPool, ThreadPool.Names.GENERIC, job, state, null) {
                 @Override
                 protected void onAbort() {
                     aborted.set(true);
@@ -477,16 +478,16 @@ public class RollupIndexerStateTests extends ESTestCase {
             assertThat(indexer.getStats().getNumInvocations(), equalTo(1L));
             assertThat(indexer.getStats().getNumPages(), equalTo(1L));
         } finally {
-            executor.shutdownNow();
+            ThreadPool.terminate(threadPool, 30, TimeUnit.SECONDS);
         }
     }
 
     public void testStopIndexing() throws Exception {
         RollupJob job = new RollupJob(ConfigTestHelpers.randomRollupJobConfig(random()), Collections.emptyMap());
         AtomicReference<IndexerState> state = new AtomicReference<>(IndexerState.STOPPED);
-        final ExecutorService executor = Executors.newFixedThreadPool(1);
+        final ThreadPool threadPool = new TestThreadPool(getTestName());
         try {
-            DelayedEmptyRollupIndexer indexer = new DelayedEmptyRollupIndexer(executor, job, state, null);
+            DelayedEmptyRollupIndexer indexer = new DelayedEmptyRollupIndexer(threadPool, ThreadPool.Names.GENERIC, job, state, null);
             final CountDownLatch latch = indexer.newLatch();
             assertFalse(indexer.maybeTriggerAsyncJob(System.currentTimeMillis()));
             assertThat(indexer.getState(), equalTo(IndexerState.STOPPED));
@@ -499,17 +500,17 @@ public class RollupIndexerStateTests extends ESTestCase {
             assertBusy(() -> assertThat(indexer.getState(), equalTo(IndexerState.STOPPED)));
             assertTrue(indexer.abort());
         } finally {
-            executor.shutdownNow();
+            ThreadPool.terminate(threadPool, 30, TimeUnit.SECONDS);
         }
     }
 
     public void testAbortIndexing() throws Exception {
         RollupJob job = new RollupJob(ConfigTestHelpers.randomRollupJobConfig(random()), Collections.emptyMap());
         AtomicReference<IndexerState> state = new AtomicReference<>(IndexerState.STOPPED);
-        final ExecutorService executor = Executors.newFixedThreadPool(1);
+        final ThreadPool threadPool = new TestThreadPool(getTestName());
         try {
             final AtomicBoolean isAborted = new AtomicBoolean(false);
-            DelayedEmptyRollupIndexer indexer = new DelayedEmptyRollupIndexer(executor, job, state, null) {
+            DelayedEmptyRollupIndexer indexer = new DelayedEmptyRollupIndexer(threadPool, ThreadPool.Names.GENERIC, job, state, null) {
                 @Override
                 protected void onAbort() {
                     isAborted.set(true);
@@ -526,17 +527,17 @@ public class RollupIndexerStateTests extends ESTestCase {
             assertBusy(() -> assertTrue(isAborted.get()));
             assertFalse(indexer.abort());
         } finally {
-            executor.shutdownNow();
+            ThreadPool.terminate(threadPool, 30, TimeUnit.SECONDS);
         }
     }
 
     public void testAbortStarted() {
         RollupJob job = new RollupJob(ConfigTestHelpers.randomRollupJobConfig(random()), Collections.emptyMap());
         AtomicReference<IndexerState> state = new AtomicReference<>(IndexerState.STOPPED);
-        final ExecutorService executor = Executors.newFixedThreadPool(1);
+        final ThreadPool threadPool = new TestThreadPool(getTestName());
         try {
             final AtomicBoolean isAborted = new AtomicBoolean(false);
-            DelayedEmptyRollupIndexer indexer = new DelayedEmptyRollupIndexer(executor, job, state, null) {
+            DelayedEmptyRollupIndexer indexer = new DelayedEmptyRollupIndexer(threadPool, ThreadPool.Names.GENERIC, job, state, null) {
                 @Override
                 protected void onAbort() {
                     isAborted.set(true);
@@ -552,17 +553,17 @@ public class RollupIndexerStateTests extends ESTestCase {
             assertThat(indexer.getStats().getNumPages(), equalTo(0L));
             assertFalse(indexer.abort());
         } finally {
-            executor.shutdownNow();
+            ThreadPool.terminate(threadPool, 30, TimeUnit.SECONDS);
         }
     }
 
     public void testMultipleJobTriggering() throws Exception {
         RollupJob job = new RollupJob(ConfigTestHelpers.randomRollupJobConfig(random()), Collections.emptyMap());
         AtomicReference<IndexerState> state = new AtomicReference<>(IndexerState.STOPPED);
-        final ExecutorService executor = Executors.newFixedThreadPool(1);
+        final ThreadPool threadPool = new TestThreadPool(getTestName());
         try {
             final AtomicBoolean isAborted = new AtomicBoolean(false);
-            DelayedEmptyRollupIndexer indexer = new DelayedEmptyRollupIndexer(executor, job, state, null) {
+            DelayedEmptyRollupIndexer indexer = new DelayedEmptyRollupIndexer(threadPool, ThreadPool.Names.GENERIC, job, state, null) {
                 @Override
                 protected void onAbort() {
                     isAborted.set(true);
@@ -589,7 +590,7 @@ public class RollupIndexerStateTests extends ESTestCase {
             assertBusy(() -> assertThat(indexer.getState(), equalTo(IndexerState.STOPPED)));
             assertTrue(indexer.abort());
         } finally {
-            executor.shutdownNow();
+            ThreadPool.terminate(threadPool, 30, TimeUnit.SECONDS);
         }
     }
 
@@ -676,10 +677,10 @@ public class RollupIndexerStateTests extends ESTestCase {
             }
         };
 
-        final ExecutorService executor = Executors.newFixedThreadPool(1);
+        final ThreadPool threadPool = new TestThreadPool(getTestName());
         try {
 
-            NonEmptyRollupIndexer indexer = new NonEmptyRollupIndexer(executor, job, state, null,
+            NonEmptyRollupIndexer indexer = new NonEmptyRollupIndexer(threadPool, ThreadPool.Names.GENERIC, job, state, null,
                 searchFunction, bulkFunction, failureConsumer, stateCheck);
             final CountDownLatch latch = indexer.newLatch(1);
             indexer.start();
@@ -701,7 +702,7 @@ public class RollupIndexerStateTests extends ESTestCase {
             assertThat(indexer.getStats().getOutputDocuments(), equalTo(0L));
             assertTrue(indexer.abort());
         } finally {
-            executor.shutdownNow();
+            ThreadPool.terminate(threadPool, 30, TimeUnit.SECONDS);
         }
     }
 
@@ -788,10 +789,10 @@ public class RollupIndexerStateTests extends ESTestCase {
             isFinished.set(true);
         };
 
-        final ExecutorService executor = Executors.newFixedThreadPool(1);
+        final ThreadPool threadPool = new TestThreadPool(getTestName());
         try {
 
-            NonEmptyRollupIndexer indexer = new NonEmptyRollupIndexer(executor, job, state, null,
+            NonEmptyRollupIndexer indexer = new NonEmptyRollupIndexer(threadPool, ThreadPool.Names.GENERIC, job, state, null,
                 searchFunction, bulkFunction, failureConsumer, doSaveStateCheck);
             final CountDownLatch latch = indexer.newLatch(1);
             indexer.start();
@@ -812,7 +813,7 @@ public class RollupIndexerStateTests extends ESTestCase {
             assertThat(indexer.getStats().getOutputDocuments(), equalTo(0L));
             assertTrue(indexer.abort());
         } finally {
-            executor.shutdownNow();
+            ThreadPool.terminate(threadPool, 30, TimeUnit.SECONDS);
         }
     }
 
@@ -836,10 +837,10 @@ public class RollupIndexerStateTests extends ESTestCase {
             }
         };
 
-        final ExecutorService executor = Executors.newFixedThreadPool(1);
+        final ThreadPool threadPool = new TestThreadPool(getTestName());
         try {
 
-            NonEmptyRollupIndexer indexer = new NonEmptyRollupIndexer(executor, job, state, null,
+            NonEmptyRollupIndexer indexer = new NonEmptyRollupIndexer(threadPool, ThreadPool.Names.GENERIC, job, state, null,
                 searchFunction, bulkFunction, failureConsumer, stateCheck);
             final CountDownLatch latch = indexer.newLatch(1);
             indexer.start();
@@ -861,7 +862,7 @@ public class RollupIndexerStateTests extends ESTestCase {
             assertThat(indexer.getStats().getOutputDocuments(), equalTo(0L));
             assertTrue(indexer.abort());
         } finally {
-            executor.shutdownNow();
+            ThreadPool.terminate(threadPool, 30, TimeUnit.SECONDS);
         }
     }
 
@@ -949,10 +950,10 @@ public class RollupIndexerStateTests extends ESTestCase {
             }
         };
 
-        final ExecutorService executor = Executors.newFixedThreadPool(1);
+        final ThreadPool threadPool = new TestThreadPool(getTestName());
         try {
 
-            NonEmptyRollupIndexer indexer = new NonEmptyRollupIndexer(executor, job, state, null,
+            NonEmptyRollupIndexer indexer = new NonEmptyRollupIndexer(threadPool, ThreadPool.Names.GENERIC, job, state, null,
                 searchFunction, bulkFunction, failureConsumer, stateCheck) {
                 @Override
                 protected void doNextBulk(BulkRequest request, ActionListener<BulkResponse> nextPhase) {
@@ -979,7 +980,7 @@ public class RollupIndexerStateTests extends ESTestCase {
             assertThat(indexer.getStats().getOutputDocuments(), equalTo(0L));
             assertTrue(indexer.abort());
         } finally {
-            executor.shutdownNow();
+            ThreadPool.terminate(threadPool, 30, TimeUnit.SECONDS);
         }
     }
 }

+ 5 - 3
x-pack/plugin/transform/src/main/java/org/elasticsearch/xpack/transform/transforms/ClientTransformIndexer.java

@@ -22,6 +22,7 @@ import org.elasticsearch.client.Client;
 import org.elasticsearch.common.Nullable;
 import org.elasticsearch.common.logging.LoggerMessageFormat;
 import org.elasticsearch.index.mapper.MapperParsingException;
+import org.elasticsearch.threadpool.ThreadPool;
 import org.elasticsearch.xpack.core.ClientHelper;
 import org.elasticsearch.xpack.core.indexing.IndexerState;
 import org.elasticsearch.xpack.core.transform.transforms.TransformCheckpoint;
@@ -42,7 +43,6 @@ import org.elasticsearch.xpack.transform.utils.ExceptionRootCauseFinder;
 import java.util.LinkedHashMap;
 import java.util.Map;
 import java.util.Map.Entry;
-import java.util.concurrent.Executor;
 import java.util.concurrent.atomic.AtomicBoolean;
 import java.util.concurrent.atomic.AtomicReference;
 
@@ -56,7 +56,8 @@ class ClientTransformIndexer extends TransformIndexer {
     private final AtomicReference<SeqNoPrimaryTermAndIndex> seqNoPrimaryTermAndIndex;
 
     ClientTransformIndexer(
-        Executor executor,
+        ThreadPool threadPool,
+        String executorName,
         TransformConfigManager transformsConfigManager,
         CheckpointProvider checkpointProvider,
         TransformProgressGatherer progressGatherer,
@@ -75,7 +76,8 @@ class ClientTransformIndexer extends TransformIndexer {
         boolean shouldStopAtCheckpoint
     ) {
         super(
-            ExceptionsHelper.requireNonNull(executor, "executor"),
+            ExceptionsHelper.requireNonNull(threadPool, "threadPool"),
+            executorName,
             transformsConfigManager,
             checkpointProvider,
             progressGatherer,

+ 4 - 3
x-pack/plugin/transform/src/main/java/org/elasticsearch/xpack/transform/transforms/ClientTransformIndexerBuilder.java

@@ -7,6 +7,7 @@
 package org.elasticsearch.xpack.transform.transforms;
 
 import org.elasticsearch.client.ParentTaskAssigningClient;
+import org.elasticsearch.threadpool.ThreadPool;
 import org.elasticsearch.xpack.core.indexing.IndexerState;
 import org.elasticsearch.xpack.core.transform.transforms.TransformCheckpoint;
 import org.elasticsearch.xpack.core.transform.transforms.TransformConfig;
@@ -20,7 +21,6 @@ import org.elasticsearch.xpack.transform.persistence.SeqNoPrimaryTermAndIndex;
 import org.elasticsearch.xpack.transform.persistence.TransformConfigManager;
 
 import java.util.Map;
-import java.util.concurrent.Executor;
 import java.util.concurrent.atomic.AtomicReference;
 
 class ClientTransformIndexerBuilder {
@@ -43,11 +43,12 @@ class ClientTransformIndexerBuilder {
         this.initialStats = new TransformIndexerStats();
     }
 
-    ClientTransformIndexer build(Executor executor, TransformContext context) {
+    ClientTransformIndexer build(ThreadPool threadPool, String executorName, TransformContext context) {
         CheckpointProvider checkpointProvider = transformsCheckpointService.getCheckpointProvider(parentTaskClient, transformConfig);
 
         return new ClientTransformIndexer(
-            executor,
+            threadPool,
+            executorName,
             transformsConfigManager,
             checkpointProvider,
             new TransformProgressGatherer(parentTaskClient),

+ 6 - 12
x-pack/plugin/transform/src/main/java/org/elasticsearch/xpack/transform/transforms/TransformIndexer.java

@@ -26,6 +26,7 @@ import org.elasticsearch.search.aggregations.Aggregations;
 import org.elasticsearch.search.aggregations.bucket.composite.CompositeAggregation;
 import org.elasticsearch.search.aggregations.bucket.composite.CompositeAggregationBuilder;
 import org.elasticsearch.search.builder.SearchSourceBuilder;
+import org.elasticsearch.threadpool.ThreadPool;
 import org.elasticsearch.xpack.core.indexing.AsyncTwoPhaseIndexer;
 import org.elasticsearch.xpack.core.indexing.IndexerState;
 import org.elasticsearch.xpack.core.indexing.IterationResult;
@@ -50,7 +51,6 @@ import java.time.Instant;
 import java.util.Collections;
 import java.util.Map;
 import java.util.Set;
-import java.util.concurrent.Executor;
 import java.util.concurrent.atomic.AtomicReference;
 import java.util.stream.Collectors;
 import java.util.stream.Stream;
@@ -114,7 +114,8 @@ public abstract class TransformIndexer extends AsyncTwoPhaseIndexer<TransformInd
     private volatile long lastCheckpointCleanup = 0L;
 
     public TransformIndexer(
-        Executor executor,
+        ThreadPool threadPool,
+        String executorName,
         TransformConfigManager transformsConfigManager,
         CheckpointProvider checkpointProvider,
         TransformProgressGatherer progressGatherer,
@@ -129,7 +130,7 @@ public abstract class TransformIndexer extends AsyncTwoPhaseIndexer<TransformInd
         TransformCheckpoint nextCheckpoint,
         TransformContext context
     ) {
-        super(executor, initialState, initialPosition, jobStats);
+        super(threadPool, executorName, initialState, initialPosition, jobStats);
         this.transformsConfigManager = ExceptionsHelper.requireNonNull(transformsConfigManager, "transformsConfigManager");
         this.checkpointProvider = ExceptionsHelper.requireNonNull(checkpointProvider, "checkpointProvider");
         this.progressGatherer = ExceptionsHelper.requireNonNull(progressGatherer, "progressGatherer");
@@ -524,7 +525,7 @@ public abstract class TransformIndexer extends AsyncTwoPhaseIndexer<TransformInd
      * @param listener listener to call after done
      */
     private void cleanupOldCheckpoints(ActionListener<Void> listener) {
-        long now = getTime();
+        long now = getTimeNanos() * 1000;
         long checkpointLowerBound = context.getCheckpoint() - NUMBER_OF_CHECKPOINTS_TO_KEEP;
         long lowerBoundEpochMs = now - RETENTION_OF_CHECKPOINTS_MS;
 
@@ -704,7 +705,7 @@ public abstract class TransformIndexer extends AsyncTwoPhaseIndexer<TransformInd
     }
 
     @Override
-    protected SearchRequest buildSearchRequest() {
+    protected SearchRequest buildSearchRequest(long waitTimeInNanos) {
         assert nextCheckpoint != null;
 
         SearchRequest searchRequest = new SearchRequest(getConfig().getSource().getIndex()).allowPartialSearchResults(false)
@@ -846,13 +847,6 @@ public abstract class TransformIndexer extends AsyncTwoPhaseIndexer<TransformInd
         context.markAsFailed(failureMessage);
     }
 
-    /*
-     * Get the current time, abstracted for the purpose of testing
-     */
-    long getTime() {
-        return System.currentTimeMillis();
-    }
-
     /**
      * Indicates if an audit message should be written when onFinish is called for the given checkpoint
      * We audit the first checkpoint, and then every 10 checkpoints until completedCheckpoint == 99

+ 1 - 1
x-pack/plugin/transform/src/main/java/org/elasticsearch/xpack/transform/transforms/TransformTask.java

@@ -538,7 +538,7 @@ public class TransformTask extends AllocatedPersistentTask implements SchedulerE
     }
 
     synchronized void initializeIndexer(ClientTransformIndexerBuilder indexerBuilder) {
-        indexer.set(indexerBuilder.build(getThreadPool().executor(ThreadPool.Names.GENERIC), context));
+        indexer.set(indexerBuilder.build(getThreadPool(), ThreadPool.Names.GENERIC, context));
     }
 
     ThreadPool getThreadPool() {

+ 3 - 3
x-pack/plugin/transform/src/test/java/org/elasticsearch/xpack/transform/transforms/ClientTransformIndexerTests.java

@@ -16,13 +16,12 @@ import org.elasticsearch.xpack.core.transform.transforms.TransformIndexerStats;
 import org.elasticsearch.xpack.core.transform.transforms.persistence.TransformInternalIndexConstants;
 import org.elasticsearch.xpack.transform.checkpoint.CheckpointProvider;
 import org.elasticsearch.xpack.transform.notifications.TransformAuditor;
-import org.elasticsearch.xpack.transform.persistence.SeqNoPrimaryTermAndIndex;
 import org.elasticsearch.xpack.transform.persistence.IndexBasedTransformConfigManager;
+import org.elasticsearch.xpack.transform.persistence.SeqNoPrimaryTermAndIndex;
 
 import java.time.Instant;
 import java.util.Collections;
 import java.util.List;
-import java.util.concurrent.Executor;
 import java.util.concurrent.ExecutorService;
 import java.util.concurrent.atomic.AtomicReference;
 import java.util.stream.Collectors;
@@ -38,7 +37,8 @@ public class ClientTransformIndexerTests extends ESTestCase {
         when(threadPool.executor("generic")).thenReturn(mock(ExecutorService.class));
 
         ClientTransformIndexer indexer = new ClientTransformIndexer(
-            mock(Executor.class),
+            mock(ThreadPool.class),
+            ThreadPool.Names.GENERIC,
             mock(IndexBasedTransformConfigManager.class),
             mock(CheckpointProvider.class),
             new TransformProgressGatherer(mock(Client.class)),

+ 19 - 26
x-pack/plugin/transform/src/test/java/org/elasticsearch/xpack/transform/transforms/TransformIndexerTests.java

@@ -26,6 +26,8 @@ import org.elasticsearch.search.profile.SearchProfileShardResults;
 import org.elasticsearch.search.suggest.Suggest;
 import org.elasticsearch.test.ESTestCase;
 import org.elasticsearch.test.client.NoOpClient;
+import org.elasticsearch.threadpool.TestThreadPool;
+import org.elasticsearch.threadpool.ThreadPool;
 import org.elasticsearch.xpack.core.indexing.IndexerState;
 import org.elasticsearch.xpack.core.indexing.IterationResult;
 import org.elasticsearch.xpack.core.transform.transforms.TransformCheckpoint;
@@ -49,9 +51,6 @@ import java.io.StringWriter;
 import java.util.Collections;
 import java.util.Map;
 import java.util.concurrent.CountDownLatch;
-import java.util.concurrent.Executor;
-import java.util.concurrent.ExecutorService;
-import java.util.concurrent.Executors;
 import java.util.concurrent.TimeUnit;
 import java.util.concurrent.atomic.AtomicBoolean;
 import java.util.concurrent.atomic.AtomicReference;
@@ -77,6 +76,7 @@ import static org.mockito.Mockito.verify;
 public class TransformIndexerTests extends ESTestCase {
 
     private Client client;
+    private ThreadPool threadPool;
 
     class MockedTransformIndexer extends TransformIndexer {
 
@@ -88,7 +88,8 @@ public class TransformIndexerTests extends ESTestCase {
         private CountDownLatch latch;
 
         MockedTransformIndexer(
-            Executor executor,
+            ThreadPool threadPool,
+            String executorName,
             IndexBasedTransformConfigManager transformsConfigManager,
             CheckpointProvider checkpointProvider,
             TransformProgressGatherer progressGatherer,
@@ -104,7 +105,8 @@ public class TransformIndexerTests extends ESTestCase {
             Consumer<String> failureConsumer
         ) {
             super(
-                executor,
+                threadPool,
+                executorName,
                 transformsConfigManager,
                 checkpointProvider,
                 progressGatherer,
@@ -216,11 +218,13 @@ public class TransformIndexerTests extends ESTestCase {
     @Before
     public void setUpMocks() {
         client = new NoOpClient(getTestName());
+        threadPool = new TestThreadPool(getTestName());
     }
 
     @After
     public void tearDownClient() {
         client.close();
+        ThreadPool.terminate(threadPool, 30, TimeUnit.SECONDS);
     }
 
     public void testPageSizeAdapt() throws Exception {
@@ -248,8 +252,6 @@ public class TransformIndexerTests extends ESTestCase {
 
         Function<BulkRequest, BulkResponse> bulkFunction = bulkRequest -> new BulkResponse(new BulkItemResponse[0], 100);
 
-        final ExecutorService executor = Executors.newFixedThreadPool(1);
-        try {
             TransformAuditor auditor = new TransformAuditor(client, "node_1");
             TransformContext context = new TransformContext(TransformTaskState.STARTED, "", 0, mock(TransformContext.Listener.class));
 
@@ -259,7 +261,8 @@ public class TransformIndexerTests extends ESTestCase {
                 searchFunction,
                 bulkFunction,
                 null,
-                executor,
+                threadPool,
+                ThreadPool.Names.GENERIC,
                 auditor,
                 context
             );
@@ -289,10 +292,6 @@ public class TransformIndexerTests extends ESTestCase {
             // assert that page size has been reduced again
             assertThat(pageSizeAfterFirstReduction, greaterThan((long) indexer.getPageSize()));
             assertThat(pageSizeAfterFirstReduction, greaterThan((long) TransformIndexer.MINIMUM_PAGE_SIZE));
-
-        } finally {
-            executor.shutdownNow();
-        }
     }
 
     public void testDoProcessAggNullCheck() {
@@ -330,8 +329,6 @@ public class TransformIndexerTests extends ESTestCase {
         Function<SearchRequest, SearchResponse> searchFunction = searchRequest -> searchResponse;
         Function<BulkRequest, BulkResponse> bulkFunction = bulkRequest -> new BulkResponse(new BulkItemResponse[0], 100);
 
-        final ExecutorService executor = Executors.newFixedThreadPool(1);
-        try {
             TransformAuditor auditor = mock(TransformAuditor.class);
             TransformContext context = new TransformContext(TransformTaskState.STARTED, "", 0, mock(TransformContext.Listener.class));
 
@@ -341,7 +338,8 @@ public class TransformIndexerTests extends ESTestCase {
                 searchFunction,
                 bulkFunction,
                 null,
-                executor,
+                threadPool,
+                ThreadPool.Names.GENERIC,
                 auditor,
                 context
             );
@@ -351,9 +349,6 @@ public class TransformIndexerTests extends ESTestCase {
             assertThat(newPosition.getPosition(), is(nullValue()));
             assertThat(newPosition.isDone(), is(true));
             verify(auditor, times(1)).info(anyString(), anyString());
-        } finally {
-            executor.shutdownNow();
-        }
     }
 
     public void testScriptError() throws Exception {
@@ -397,8 +392,6 @@ public class TransformIndexerTests extends ESTestCase {
             failureMessage.compareAndSet(null, message);
         };
 
-        final ExecutorService executor = Executors.newFixedThreadPool(1);
-        try {
             MockTransformAuditor auditor = new MockTransformAuditor();
             TransformContext.Listener contextListener = mock(TransformContext.Listener.class);
             TransformContext context = new TransformContext(TransformTaskState.STARTED, "", 0, contextListener);
@@ -409,7 +402,8 @@ public class TransformIndexerTests extends ESTestCase {
                 searchFunction,
                 bulkFunction,
                 failureConsumer,
-                executor,
+                threadPool,
+                ThreadPool.Names.GENERIC,
                 auditor,
                 context
             );
@@ -433,9 +427,6 @@ public class TransformIndexerTests extends ESTestCase {
                 failureMessage.get(),
                 matchesRegex("Failed to execute script with error: \\[.*ArithmeticException: / by zero\\], stack trace: \\[stack\\]")
             );
-        } finally {
-            executor.shutdownNow();
-        }
     }
 
     private MockedTransformIndexer createMockIndexer(
@@ -444,12 +435,14 @@ public class TransformIndexerTests extends ESTestCase {
         Function<SearchRequest, SearchResponse> searchFunction,
         Function<BulkRequest, BulkResponse> bulkFunction,
         Consumer<String> failureConsumer,
-        final ExecutorService executor,
+        ThreadPool threadPool,
+        String executorName,
         TransformAuditor auditor,
         TransformContext context
     ) {
         return new MockedTransformIndexer(
-            executor,
+            threadPool,
+            executorName,
             mock(IndexBasedTransformConfigManager.class),
             mock(CheckpointProvider.class),
             new TransformProgressGatherer(client),