Browse Source

Foreach processor - fork recursive call (#50514)

A very large number of recursive calls can cause a stack overflow
exception. This commit forks the recursive calls for non-async
processors. Once forked, each thread will handle at most 10
recursive calls to help keep the stack size and thread count 
down to a reasonable size.
Jake Landis 5 years ago
parent
commit
c3de284360

+ 21 - 7
modules/ingest-common/src/main/java/org/elasticsearch/ingest/common/ForEachProcessor.java

@@ -23,6 +23,8 @@ import org.elasticsearch.ingest.AbstractProcessor;
 import org.elasticsearch.ingest.ConfigurationUtils;
 import org.elasticsearch.ingest.IngestDocument;
 import org.elasticsearch.ingest.Processor;
+import org.elasticsearch.ingest.WrappingProcessor;
+import org.elasticsearch.script.ScriptService;
 
 import java.util.ArrayList;
 import java.util.List;
@@ -30,9 +32,7 @@ import java.util.Map;
 import java.util.Set;
 import java.util.concurrent.CopyOnWriteArrayList;
 import java.util.function.BiConsumer;
-
-import org.elasticsearch.ingest.WrappingProcessor;
-import org.elasticsearch.script.ScriptService;
+import java.util.function.Consumer;
 
 import static org.elasticsearch.ingest.ConfigurationUtils.newConfigurationException;
 import static org.elasticsearch.ingest.ConfigurationUtils.readBooleanProperty;
@@ -50,16 +50,19 @@ import static org.elasticsearch.ingest.ConfigurationUtils.readStringProperty;
 public final class ForEachProcessor extends AbstractProcessor implements WrappingProcessor {
 
     public static final String TYPE = "foreach";
+    static final int MAX_RECURSE_PER_THREAD = 10;
 
     private final String field;
     private final Processor processor;
     private final boolean ignoreMissing;
+    private final Consumer<Runnable> genericExecutor;
 
-    ForEachProcessor(String tag, String field, Processor processor, boolean ignoreMissing) {
+    ForEachProcessor(String tag, String field, Processor processor, boolean ignoreMissing, Consumer<Runnable> genericExecutor) {
         super(tag);
         this.field = field;
         this.processor = processor;
         this.ignoreMissing = ignoreMissing;
+        this.genericExecutor = genericExecutor;
     }
 
     boolean isIgnoreMissing() {
@@ -91,6 +94,7 @@ public final class ForEachProcessor extends AbstractProcessor implements Wrappin
 
         Object value = values.get(index);
         Object previousValue = document.getIngestMetadata().put("_value", value);
+        final Thread thread = Thread.currentThread();
         processor.execute(document, (result, e) -> {
             if (e != null)  {
                 newValues.add(document.getIngestMetadata().put("_value", previousValue));
@@ -99,7 +103,15 @@ public final class ForEachProcessor extends AbstractProcessor implements Wrappin
                 handler.accept(null, null);
             } else {
                 newValues.add(document.getIngestMetadata().put("_value", previousValue));
-                innerExecute(index + 1, values, newValues, document, handler);
+                if (thread == Thread.currentThread() && (index + 1) % MAX_RECURSE_PER_THREAD == 0) {
+                    // we are on the same thread and we need to fork to another thread to avoid recursive stack overflow on a single thread
+                    // only fork after 10 recursive calls, then fork every 10 to keep the number of threads down
+                    genericExecutor.accept(() -> innerExecute(index + 1, values, newValues, document, handler));
+                } else {
+                    // we are on a different thread (we went asynchronous), it's safe to recurse
+                    // or we have recursed less then 10 times with the same thread, it's safe to recurse
+                    innerExecute(index + 1, values, newValues, document, handler);
+                }
             }
         });
     }
@@ -125,9 +137,11 @@ public final class ForEachProcessor extends AbstractProcessor implements Wrappin
     public static final class Factory implements Processor.Factory {
 
         private final ScriptService scriptService;
+        private final Consumer<Runnable> genericExecutor;
 
-        Factory(ScriptService scriptService) {
+        Factory(ScriptService scriptService, Consumer<Runnable> genericExecutor) {
             this.scriptService = scriptService;
+            this.genericExecutor = genericExecutor;
         }
 
         @Override
@@ -143,7 +157,7 @@ public final class ForEachProcessor extends AbstractProcessor implements Wrappin
             Map.Entry<String, Map<String, Object>> entry = entries.iterator().next();
             Processor processor =
                 ConfigurationUtils.readProcessor(factories, scriptService, entry.getKey(), entry.getValue());
-            return new ForEachProcessor(tag, field, processor, ignoreMissing);
+            return new ForEachProcessor(tag, field, processor, ignoreMissing, genericExecutor);
         }
     }
 }

+ 1 - 1
modules/ingest-common/src/main/java/org/elasticsearch/ingest/common/IngestCommonPlugin.java

@@ -75,7 +75,7 @@ public class IngestCommonPlugin extends Plugin implements ActionPlugin, IngestPl
                 entry(ConvertProcessor.TYPE, new ConvertProcessor.Factory()),
                 entry(GsubProcessor.TYPE, new GsubProcessor.Factory()),
                 entry(FailProcessor.TYPE, new FailProcessor.Factory(parameters.scriptService)),
-                entry(ForEachProcessor.TYPE, new ForEachProcessor.Factory(parameters.scriptService)),
+                entry(ForEachProcessor.TYPE, new ForEachProcessor.Factory(parameters.scriptService, parameters.genericExecutor)),
                 entry(DateIndexNameProcessor.TYPE, new DateIndexNameProcessor.Factory(parameters.scriptService)),
                 entry(SortProcessor.TYPE, new SortProcessor.Factory()),
                 entry(GrokProcessor.TYPE, new GrokProcessor.Factory(GROK_PATTERNS, createGrokThreadWatchdog(parameters))),

+ 8 - 6
modules/ingest-common/src/test/java/org/elasticsearch/ingest/common/ForEachProcessorFactoryTests.java

@@ -29,6 +29,7 @@ import org.hamcrest.Matchers;
 import java.util.Collections;
 import java.util.HashMap;
 import java.util.Map;
+import java.util.function.Consumer;
 
 import static org.hamcrest.Matchers.equalTo;
 import static org.mockito.Mockito.mock;
@@ -36,12 +37,13 @@ import static org.mockito.Mockito.mock;
 public class ForEachProcessorFactoryTests extends ESTestCase {
 
     private final ScriptService scriptService = mock(ScriptService.class);
+    private final Consumer<Runnable> genericExecutor = Runnable::run;
 
     public void testCreate() throws Exception {
         Processor processor = new TestProcessor(ingestDocument -> { });
         Map<String, Processor.Factory> registry = new HashMap<>();
         registry.put("_name", (r, t, c) -> processor);
-        ForEachProcessor.Factory forEachFactory = new ForEachProcessor.Factory(scriptService);
+        ForEachProcessor.Factory forEachFactory = new ForEachProcessor.Factory(scriptService, genericExecutor);
 
         Map<String, Object> config = new HashMap<>();
         config.put("field", "_field");
@@ -57,7 +59,7 @@ public class ForEachProcessorFactoryTests extends ESTestCase {
         Processor processor = new TestProcessor(ingestDocument -> { });
         Map<String, Processor.Factory> registry = new HashMap<>();
         registry.put("_name", (r, t, c) -> processor);
-        ForEachProcessor.Factory forEachFactory = new ForEachProcessor.Factory(scriptService);
+        ForEachProcessor.Factory forEachFactory = new ForEachProcessor.Factory(scriptService, genericExecutor);
 
         Map<String, Object> config = new HashMap<>();
         config.put("field", "_field");
@@ -75,7 +77,7 @@ public class ForEachProcessorFactoryTests extends ESTestCase {
         Map<String, Processor.Factory> registry = new HashMap<>();
         registry.put("_first", (r, t, c) -> processor);
         registry.put("_second", (r, t, c) -> processor);
-        ForEachProcessor.Factory forEachFactory = new ForEachProcessor.Factory(scriptService);
+        ForEachProcessor.Factory forEachFactory = new ForEachProcessor.Factory(scriptService, genericExecutor);
 
         Map<String, Object> config = new HashMap<>();
         config.put("field", "_field");
@@ -88,7 +90,7 @@ public class ForEachProcessorFactoryTests extends ESTestCase {
     }
 
     public void testCreateWithNonExistingProcessorType() throws Exception {
-        ForEachProcessor.Factory forEachFactory = new ForEachProcessor.Factory(scriptService);
+        ForEachProcessor.Factory forEachFactory = new ForEachProcessor.Factory(scriptService, genericExecutor);
         Map<String, Object> config = new HashMap<>();
         config.put("field", "_field");
         config.put("processor", Collections.singletonMap("_name", Collections.emptyMap()));
@@ -101,7 +103,7 @@ public class ForEachProcessorFactoryTests extends ESTestCase {
         Processor processor = new TestProcessor(ingestDocument -> { });
         Map<String, Processor.Factory> registry = new HashMap<>();
         registry.put("_name", (r, t, c) -> processor);
-        ForEachProcessor.Factory forEachFactory = new ForEachProcessor.Factory(scriptService);
+        ForEachProcessor.Factory forEachFactory = new ForEachProcessor.Factory(scriptService, genericExecutor);
         Map<String, Object> config = new HashMap<>();
         config.put("processor", Collections.singletonList(Collections.singletonMap("_name", Collections.emptyMap())));
         Exception exception = expectThrows(Exception.class, () -> forEachFactory.create(registry, null, config));
@@ -109,7 +111,7 @@ public class ForEachProcessorFactoryTests extends ESTestCase {
     }
 
     public void testCreateWithMissingProcessor() {
-        ForEachProcessor.Factory forEachFactory = new ForEachProcessor.Factory(scriptService);
+        ForEachProcessor.Factory forEachFactory = new ForEachProcessor.Factory(scriptService, genericExecutor);
         Map<String, Object> config = new HashMap<>();
         config.put("field", "_field");
         Exception exception = expectThrows(Exception.class, () -> forEachFactory.create(Collections.emptyMap(), null, config));

+ 132 - 11
modules/ingest-common/src/test/java/org/elasticsearch/ingest/common/ForEachProcessorTests.java

@@ -19,6 +19,9 @@
 
 package org.elasticsearch.ingest.common;
 
+import org.elasticsearch.common.settings.Settings;
+import org.elasticsearch.common.util.concurrent.EsExecutors;
+import org.elasticsearch.common.util.concurrent.ThreadContext;
 import org.elasticsearch.ingest.CompoundProcessor;
 import org.elasticsearch.ingest.IngestDocument;
 import org.elasticsearch.ingest.Processor;
@@ -26,6 +29,7 @@ import org.elasticsearch.ingest.TestProcessor;
 import org.elasticsearch.ingest.TestTemplateService;
 import org.elasticsearch.script.TemplateScript;
 import org.elasticsearch.test.ESTestCase;
+import org.junit.Before;
 
 import java.util.ArrayList;
 import java.util.Arrays;
@@ -34,32 +38,98 @@ import java.util.HashMap;
 import java.util.List;
 import java.util.Locale;
 import java.util.Map;
+import java.util.concurrent.ExecutorService;
+import java.util.concurrent.ThreadPoolExecutor;
+import java.util.concurrent.TimeUnit;
+import java.util.function.BiConsumer;
+import java.util.function.Consumer;
+import java.util.stream.IntStream;
 
 import static org.elasticsearch.ingest.IngestDocumentMatcher.assertIngestDocument;
 import static org.hamcrest.Matchers.equalTo;
+import static org.mockito.Matchers.any;
+import static org.mockito.Mockito.doAnswer;
+import static org.mockito.Mockito.mock;
+import static org.mockito.Mockito.times;
+import static org.mockito.Mockito.verify;
+import static org.mockito.Mockito.verifyZeroInteractions;
 
 public class ForEachProcessorTests extends ESTestCase {
 
+    @SuppressWarnings("unchecked")
+    private Consumer<Runnable> genericExecutor = (Consumer<Runnable>) mock(Consumer.class);
+    private final ExecutorService direct = EsExecutors.newDirectExecutorService();
+
+    @Before
+    public void setup() {
+        //execute runnable on same thread for simplicity. some tests will override this and actually run async
+        doAnswer(invocationOnMock -> {
+            direct.execute((Runnable) invocationOnMock.getArguments()[0]);
+            return null;
+        }).when(genericExecutor).accept(any(Runnable.class));
+    }
+
     public void testExecute() throws Exception {
+        ThreadPoolExecutor asyncExecutor =
+            EsExecutors.newScaling(getClass().getName() + "/" + getTestName(), between(1, 2), between(3, 4), 10, TimeUnit.SECONDS,
+                EsExecutors.daemonThreadFactory("test"), new ThreadContext(Settings.EMPTY));
+        doAnswer(invocationOnMock -> {
+            asyncExecutor.execute((Runnable) invocationOnMock.getArguments()[0]);
+            return null;
+        }).when(genericExecutor).accept(any(Runnable.class));
+
         List<String> values = new ArrayList<>();
         values.add("foo");
         values.add("bar");
         values.add("baz");
+        IntStream.range(0, ForEachProcessor.MAX_RECURSE_PER_THREAD).forEach(value -> values.add("a"));
         IngestDocument ingestDocument = new IngestDocument(
             "_index", "_id", null, null, null, Collections.singletonMap("values", values)
         );
 
         ForEachProcessor processor = new ForEachProcessor(
             "_tag", "values", new UppercaseProcessor("_tag", "_ingest._value", false, "_ingest._value"),
-            false
+            false, genericExecutor
         );
         processor.execute(ingestDocument, (result, e) -> {});
 
+        assertBusy(() -> assertEquals(values.size() / ForEachProcessor.MAX_RECURSE_PER_THREAD, asyncExecutor.getCompletedTaskCount()));
+        asyncExecutor.shutdown();
+        asyncExecutor.awaitTermination(5, TimeUnit.SECONDS);
+
         @SuppressWarnings("unchecked")
         List<String> result = ingestDocument.getFieldValue("values", List.class);
         assertThat(result.get(0), equalTo("FOO"));
         assertThat(result.get(1), equalTo("BAR"));
         assertThat(result.get(2), equalTo("BAZ"));
+        IntStream.range(3, ForEachProcessor.MAX_RECURSE_PER_THREAD + 3).forEach(i -> assertThat(result.get(i), equalTo("A")));
+        verify(genericExecutor, times(values.size() / ForEachProcessor.MAX_RECURSE_PER_THREAD)).accept(any(Runnable.class));
+    }
+
+    public void testExecuteWithAsyncProcessor() throws Exception {
+        List<String> values = new ArrayList<>();
+        values.add("foo");
+        values.add("bar");
+        values.add("baz");
+        IngestDocument ingestDocument = new IngestDocument(
+            "_index", "_id", null, null, null, Collections.singletonMap("values", values)
+        );
+
+        ForEachProcessor processor = new ForEachProcessor("_tag", "values", new AsyncUpperCaseProcessor("_ingest._value"),
+            false, genericExecutor);
+        processor.execute(ingestDocument, (result, e) -> {
+        });
+
+        assertBusy(() -> {
+            @SuppressWarnings("unchecked")
+            List<String> result = ingestDocument.getFieldValue("values", List.class);
+            assertEquals(values.size(), result.size());
+            assertThat(result.get(0), equalTo("FOO"));
+            assertThat(result.get(1), equalTo("BAR"));
+            assertThat(result.get(2), equalTo("BAZ"));
+        });
+
+        verifyZeroInteractions(genericExecutor);
     }
 
     public void testExecuteWithFailure() throws Exception {
@@ -72,7 +142,7 @@ public class ForEachProcessorTests extends ESTestCase {
                 throw new RuntimeException("failure");
             }
         });
-        ForEachProcessor processor = new ForEachProcessor("_tag", "values", testProcessor, false);
+        ForEachProcessor processor = new ForEachProcessor("_tag", "values", testProcessor, false, genericExecutor);
         Exception[] exceptions = new Exception[1];
         processor.execute(ingestDocument, (result, e) -> {exceptions[0] = e;});
         assertThat(exceptions[0].getMessage(), equalTo("failure"));
@@ -90,7 +160,7 @@ public class ForEachProcessorTests extends ESTestCase {
         Processor onFailureProcessor = new TestProcessor(ingestDocument1 -> {});
         processor = new ForEachProcessor(
             "_tag", "values", new CompoundProcessor(false, Arrays.asList(testProcessor), Arrays.asList(onFailureProcessor)),
-            false
+            false, genericExecutor
         );
         processor.execute(ingestDocument, (result, e) -> {});
         assertThat(testProcessor.getInvokedCounter(), equalTo(3));
@@ -109,7 +179,7 @@ public class ForEachProcessorTests extends ESTestCase {
             id.setFieldValue("_ingest._value.index", id.getSourceAndMetadata().get("_index"));
             id.setFieldValue("_ingest._value.id", id.getSourceAndMetadata().get("_id"));
         });
-        ForEachProcessor processor = new ForEachProcessor("_tag", "values", innerProcessor, false);
+        ForEachProcessor processor = new ForEachProcessor("_tag", "values", innerProcessor, false, genericExecutor);
         processor.execute(ingestDocument, (result, e) -> {});
 
         assertThat(innerProcessor.getInvokedCounter(), equalTo(2));
@@ -135,7 +205,7 @@ public class ForEachProcessorTests extends ESTestCase {
         ForEachProcessor processor = new ForEachProcessor(
             "_tag", "values", new SetProcessor("_tag",
             new TestTemplateService.MockTemplateScript.Factory("_ingest._value.new_field"),
-            (model) -> model.get("other")), false);
+            (model) -> model.get("other")), false, genericExecutor);
         processor.execute(ingestDocument, (result, e) -> {});
 
         assertThat(ingestDocument.getFieldValue("values.0.new_field", String.class), equalTo("value"));
@@ -146,6 +216,13 @@ public class ForEachProcessorTests extends ESTestCase {
     }
 
     public void testRandom() throws Exception {
+        ThreadPoolExecutor asyncExecutor =
+            EsExecutors.newScaling(getClass().getName() + "/" + getTestName(), between(1, 2), between(3, 4), 10, TimeUnit.SECONDS,
+                EsExecutors.daemonThreadFactory("test"), new ThreadContext(Settings.EMPTY));
+        doAnswer(invocationOnMock -> {
+            asyncExecutor.execute((Runnable) invocationOnMock.getArguments()[0]);
+            return null;
+        }).when(genericExecutor).accept(any(Runnable.class));
         Processor innerProcessor = new Processor() {
                 @Override
                 public IngestDocument execute(IngestDocument ingestDocument) throws Exception {
@@ -164,7 +241,7 @@ public class ForEachProcessorTests extends ESTestCase {
                     return null;
                 }
         };
-        int numValues = randomIntBetween(1, 32);
+        int numValues = randomIntBetween(1, 10000);
         List<String> values = new ArrayList<>(numValues);
         for (int i = 0; i < numValues; i++) {
             values.add("");
@@ -173,14 +250,20 @@ public class ForEachProcessorTests extends ESTestCase {
             "_index", "_id", null, null, null, Collections.singletonMap("values", values)
         );
 
-        ForEachProcessor processor = new ForEachProcessor("_tag", "values", innerProcessor, false);
+        ForEachProcessor processor = new ForEachProcessor("_tag", "values", innerProcessor, false, genericExecutor);
         processor.execute(ingestDocument, (result, e) -> {});
+
+        assertBusy(() -> assertEquals(values.size() / ForEachProcessor.MAX_RECURSE_PER_THREAD, asyncExecutor.getCompletedTaskCount()));
+        asyncExecutor.shutdown();
+        asyncExecutor.awaitTermination(5, TimeUnit.SECONDS);
+
         @SuppressWarnings("unchecked")
         List<String> result = ingestDocument.getFieldValue("values", List.class);
         assertThat(result.size(), equalTo(numValues));
         for (String r : result) {
             assertThat(r, equalTo("."));
         }
+        verify(genericExecutor, times(values.size() / ForEachProcessor.MAX_RECURSE_PER_THREAD)).accept(any(Runnable.class));
     }
 
     public void testModifyFieldsOutsideArray() throws Exception {
@@ -198,7 +281,7 @@ public class ForEachProcessorTests extends ESTestCase {
                 "_tag", "values", new CompoundProcessor(false,
                 Collections.singletonList(new UppercaseProcessor("_tag_upper", "_ingest._value", false, "_ingest._value")),
                 Collections.singletonList(new AppendProcessor("_tag", template, (model) -> (Collections.singletonList("added"))))
-        ), false);
+        ), false, genericExecutor);
         processor.execute(ingestDocument, (result, e) -> {});
 
         List<?> result = ingestDocument.getFieldValue("values", List.class);
@@ -224,7 +307,7 @@ public class ForEachProcessorTests extends ESTestCase {
 
         TestProcessor processor = new TestProcessor(doc -> doc.setFieldValue("_ingest._value",
                 doc.getFieldValue("_source._value", String.class)));
-        ForEachProcessor forEachProcessor = new ForEachProcessor("_tag", "values", processor, false);
+        ForEachProcessor forEachProcessor = new ForEachProcessor("_tag", "values", processor, false, genericExecutor);
         forEachProcessor.execute(ingestDocument, (result, e) -> {});
 
         List<?> result = ingestDocument.getFieldValue("values", List.class);
@@ -257,7 +340,8 @@ public class ForEachProcessorTests extends ESTestCase {
                 doc -> doc.setFieldValue("_ingest._value", doc.getFieldValue("_ingest._value", String.class).toUpperCase(Locale.ENGLISH))
         );
         ForEachProcessor processor = new ForEachProcessor(
-                "_tag", "values1", new ForEachProcessor("_tag", "_ingest._value.values2", testProcessor, false), false);
+                "_tag", "values1", new ForEachProcessor("_tag", "_ingest._value.values2", testProcessor, false, genericExecutor),
+            false, genericExecutor);
         processor.execute(ingestDocument, (result, e) -> {});
 
         List<?> result = ingestDocument.getFieldValue("values1.0.values2", List.class);
@@ -275,10 +359,47 @@ public class ForEachProcessorTests extends ESTestCase {
         );
         IngestDocument ingestDocument = new IngestDocument(originalIngestDocument);
         TestProcessor testProcessor = new TestProcessor(doc -> {});
-        ForEachProcessor processor = new ForEachProcessor("_tag", "_ingest._value", testProcessor, true);
+        ForEachProcessor processor = new ForEachProcessor("_tag", "_ingest._value", testProcessor, true, genericExecutor);
         processor.execute(ingestDocument, (result, e) -> {});
         assertIngestDocument(originalIngestDocument, ingestDocument);
         assertThat(testProcessor.getInvokedCounter(), equalTo(0));
     }
 
+    private class AsyncUpperCaseProcessor implements Processor {
+
+        private final String field;
+
+        private AsyncUpperCaseProcessor(String field) {
+            this.field = field;
+        }
+
+        @Override
+        public void execute(IngestDocument document, BiConsumer<IngestDocument, Exception> handler) {
+            new Thread(() -> {
+                try {
+                    String value = document.getFieldValue(field, String.class, false);
+                    document.setFieldValue(field, value.toUpperCase(Locale.ROOT));
+                    handler.accept(document, null);
+                } catch (Exception e) {
+                    handler.accept(null, e);
+                }
+            }).start();
+        }
+
+        @Override
+        public IngestDocument execute(IngestDocument ingestDocument) throws Exception {
+            throw new UnsupportedOperationException("this is an async processor, don't call this");
+        }
+
+        @Override
+        public String getType() {
+            return "uppercase-async";
+        }
+
+        @Override
+        public String getTag() {
+            return getType();
+        }
+    }
+
 }

+ 19 - 0
modules/ingest-common/src/test/resources/rest-api-spec/test/ingest/80_foreach.yml

@@ -43,3 +43,22 @@ teardown:
         index: test
         id: 1
   - match: { _source.values: ["FOO", "BAR", "BAZ"] }
+
+ #exceeds the recurse max per thread and will runs some of these on a different thread
+  - do:
+      index:
+        index: test
+        id: 1
+        pipeline: "my_pipeline"
+        body: >
+          {
+            "values": ["a", "b", "c", "d", "e", "f", "g", "h", "i", "j", "k", "l", "m", "n", "o", "p", "q", "r", "s", "t", "u",
+                       "v", "w", "x", "y", "z"]
+          }
+
+  - do:
+      get:
+        index: test
+        id: 1
+  - match: { _source.values: ["A", "B", "C", "D", "E", "F", "G", "H", "I", "J", "K", "L", "M", "N", "O", "P", "Q", "R", "S", "T", "U",
+                              "V", "W", "X", "Y", "Z"] }

+ 2 - 1
server/src/main/java/org/elasticsearch/ingest/IngestService.java

@@ -101,9 +101,10 @@ public class IngestService implements ClusterStateApplier {
                 threadPool.getThreadContext(), threadPool::relativeTimeInMillis,
                 (delay, command) -> threadPool.schedule(
                     command, TimeValue.timeValueMillis(delay), ThreadPool.Names.GENERIC
-                ), this, client
+                ), this, client, threadPool.generic()::execute
             )
         );
+
         this.threadPool = threadPool;
     }
 

+ 5 - 1
server/src/main/java/org/elasticsearch/ingest/Processor.java

@@ -29,6 +29,7 @@ import org.elasticsearch.threadpool.Scheduler;
 import java.util.Map;
 import java.util.function.BiConsumer;
 import java.util.function.BiFunction;
+import java.util.function.Consumer;
 import java.util.function.LongSupplier;
 
 /**
@@ -122,6 +123,8 @@ public interface Processor {
 
         public final IngestService ingestService;
 
+        public final Consumer<Runnable> genericExecutor;
+
         /**
          * Provides scheduler support
          */
@@ -134,7 +137,7 @@ public interface Processor {
 
         public Parameters(Environment env, ScriptService scriptService, AnalysisRegistry analysisRegistry,  ThreadContext threadContext,
                           LongSupplier relativeTimeSupplier, BiFunction<Long, Runnable, Scheduler.ScheduledCancellable> scheduler,
-                          IngestService ingestService, Client client) {
+                          IngestService ingestService, Client client, Consumer<Runnable> genericExecutor ) {
             this.env = env;
             this.scriptService = scriptService;
             this.threadContext = threadContext;
@@ -143,6 +146,7 @@ public interface Processor {
             this.scheduler = scheduler;
             this.ingestService = ingestService;
             this.client = client;
+            this.genericExecutor = genericExecutor;
         }
 
     }

+ 17 - 11
server/src/test/java/org/elasticsearch/ingest/IngestServiceTests.java

@@ -60,6 +60,7 @@ import org.elasticsearch.test.ESTestCase;
 import org.elasticsearch.test.MockLogAppender;
 import org.elasticsearch.threadpool.ThreadPool;
 import org.hamcrest.CustomTypeSafeMatcher;
+import org.junit.Before;
 import org.mockito.ArgumentMatcher;
 import org.mockito.invocation.InvocationOnMock;
 
@@ -111,10 +112,18 @@ public class IngestServiceTests extends ESTestCase {
         }
     };
 
+    private ThreadPool threadPool;
+
+    @Before
+    public void setup(){
+        threadPool = mock(ThreadPool.class);
+        ExecutorService executorService = EsExecutors.newDirectExecutorService();
+        when(threadPool.generic()).thenReturn(executorService);
+        when(threadPool.executor(anyString())).thenReturn(executorService);
+    }
     public void testIngestPlugin() {
-        ThreadPool tp = mock(ThreadPool.class);
         Client client = mock(Client.class);
-        IngestService ingestService = new IngestService(mock(ClusterService.class), tp, null, null,
+        IngestService ingestService = new IngestService(mock(ClusterService.class), threadPool, null, null,
             null, Collections.singletonList(DUMMY_PLUGIN), client);
         Map<String, Processor.Factory> factories = ingestService.getProcessorFactories();
         assertTrue(factories.containsKey("foo"));
@@ -122,19 +131,15 @@ public class IngestServiceTests extends ESTestCase {
     }
 
     public void testIngestPluginDuplicate() {
-        ThreadPool tp = mock(ThreadPool.class);
         Client client = mock(Client.class);
         IllegalArgumentException e = expectThrows(IllegalArgumentException.class, () ->
-            new IngestService(mock(ClusterService.class), tp, null, null,
+            new IngestService(mock(ClusterService.class), threadPool, null, null,
             null, Arrays.asList(DUMMY_PLUGIN, DUMMY_PLUGIN), client));
         assertTrue(e.getMessage(), e.getMessage().contains("already registered"));
     }
 
     public void testExecuteIndexPipelineDoesNotExist() {
-        ThreadPool threadPool = mock(ThreadPool.class);
         Client client = mock(Client.class);
-        final ExecutorService executorService = EsExecutors.newDirectExecutorService();
-        when(threadPool.executor(anyString())).thenReturn(executorService);
         IngestService ingestService = new IngestService(mock(ClusterService.class), threadPool, null, null,
             null, Collections.singletonList(DUMMY_PLUGIN), client);
         final IndexRequest indexRequest =
@@ -1188,10 +1193,9 @@ public class IngestServiceTests extends ESTestCase {
         };
 
         // Create ingest service:
-        ThreadPool tp = mock(ThreadPool.class);
         Client client = mock(Client.class);
         IngestService ingestService =
-            new IngestService(mock(ClusterService.class), tp, null, null, null, List.of(testPlugin), client);
+            new IngestService(mock(ClusterService.class), threadPool, null, null, null, List.of(testPlugin), client);
         ingestService.addIngestClusterStateListener(ingestClusterStateListener);
 
         // Create pipeline and apply the resulting cluster state, which should update the counter in the right order:
@@ -1259,9 +1263,11 @@ public class IngestServiceTests extends ESTestCase {
     }
 
     private static IngestService createWithProcessors(Map<String, Processor.Factory> processors) {
-        ThreadPool threadPool = mock(ThreadPool.class);
+
         Client client = mock(Client.class);
-        final ExecutorService executorService = EsExecutors.newDirectExecutorService();
+        ThreadPool threadPool = mock(ThreadPool.class);
+        ExecutorService executorService = EsExecutors.newDirectExecutorService();
+        when(threadPool.generic()).thenReturn(executorService);
         when(threadPool.executor(anyString())).thenReturn(executorService);
         return new IngestService(mock(ClusterService.class), threadPool, null, null,
             null, Collections.singletonList(new IngestPlugin() {

+ 4 - 0
x-pack/plugin/ml/src/test/java/org/elasticsearch/xpack/ml/action/TransportGetTrainedModelsStatsActionTests.java

@@ -19,6 +19,7 @@ import org.elasticsearch.cluster.service.MasterService;
 import org.elasticsearch.common.bytes.BytesReference;
 import org.elasticsearch.common.settings.ClusterSettings;
 import org.elasticsearch.common.settings.Settings;
+import org.elasticsearch.common.util.concurrent.EsExecutors;
 import org.elasticsearch.common.xcontent.XContentBuilder;
 import org.elasticsearch.common.xcontent.XContentFactory;
 import org.elasticsearch.common.xcontent.XContentType;
@@ -44,6 +45,7 @@ import java.util.HashSet;
 import java.util.List;
 import java.util.Map;
 import java.util.Set;
+import java.util.concurrent.ExecutorService;
 import java.util.stream.Collectors;
 import java.util.stream.IntStream;
 
@@ -105,6 +107,8 @@ public class TransportGetTrainedModelsStatsActionTests extends ESTestCase {
     @Before
     public void setUpVariables() {
         ThreadPool tp = mock(ThreadPool.class);
+        ExecutorService executorService = EsExecutors.newDirectExecutorService();
+        when(tp.generic()).thenReturn(executorService);
         client = mock(Client.class);
         clusterService = mock(ClusterService.class);
         Settings settings = Settings.builder().put("node.name", "InferenceProcessorFactoryTests_node").build();

+ 4 - 0
x-pack/plugin/ml/src/test/java/org/elasticsearch/xpack/ml/inference/ingest/InferenceProcessorFactoryTests.java

@@ -22,6 +22,7 @@ import org.elasticsearch.common.bytes.BytesReference;
 import org.elasticsearch.common.settings.ClusterSettings;
 import org.elasticsearch.common.settings.Settings;
 import org.elasticsearch.common.transport.TransportAddress;
+import org.elasticsearch.common.util.concurrent.EsExecutors;
 import org.elasticsearch.common.xcontent.XContentBuilder;
 import org.elasticsearch.common.xcontent.XContentFactory;
 import org.elasticsearch.common.xcontent.XContentType;
@@ -44,6 +45,7 @@ import java.util.Collections;
 import java.util.HashMap;
 import java.util.HashSet;
 import java.util.Map;
+import java.util.concurrent.ExecutorService;
 
 import static org.hamcrest.Matchers.equalTo;
 import static org.mockito.Mockito.mock;
@@ -71,6 +73,8 @@ public class InferenceProcessorFactoryTests extends ESTestCase {
     @Before
     public void setUpVariables() {
         ThreadPool tp = mock(ThreadPool.class);
+        ExecutorService executorService = EsExecutors.newDirectExecutorService();
+        when(tp.generic()).thenReturn(executorService);
         client = mock(Client.class);
         Settings settings = Settings.builder().put("node.name", "InferenceProcessorFactoryTests_node").build();
         ClusterSettings clusterSettings = new ClusterSettings(settings,