Browse Source

Refactor ForEachProcessor to use iteration instead of recursion (#51104)

* Refactor ForEachProcessor to use iteration instead of recursion

This change makes ForEachProcessor iterative and still non-blocking.
In case of non-async processors we use single for loop and no recursion at all.
In case of async processors we continue work on either current thread or thread
started by downstream processor, whichever is slower (usually processor thread).
Everything is synchronised by single atomic variable.

Relates #50514
Przemko Robakowski 5 years ago
parent
commit
a4da1c9b46

+ 24 - 36
modules/ingest-common/src/main/java/org/elasticsearch/ingest/common/ForEachProcessor.java

@@ -30,9 +30,8 @@ import java.util.ArrayList;
 import java.util.List;
 import java.util.Map;
 import java.util.Set;
-import java.util.concurrent.CopyOnWriteArrayList;
+import java.util.concurrent.atomic.AtomicBoolean;
 import java.util.function.BiConsumer;
-import java.util.function.Consumer;
 
 import static org.elasticsearch.ingest.ConfigurationUtils.newConfigurationException;
 import static org.elasticsearch.ingest.ConfigurationUtils.readBooleanProperty;
@@ -50,19 +49,16 @@ import static org.elasticsearch.ingest.ConfigurationUtils.readStringProperty;
 public final class ForEachProcessor extends AbstractProcessor implements WrappingProcessor {
 
     public static final String TYPE = "foreach";
-    static final int MAX_RECURSE_PER_THREAD = 10;
 
     private final String field;
     private final Processor processor;
     private final boolean ignoreMissing;
-    private final Consumer<Runnable> genericExecutor;
 
-    ForEachProcessor(String tag, String field, Processor processor, boolean ignoreMissing, Consumer<Runnable> genericExecutor) {
+    ForEachProcessor(String tag, String field, Processor processor, boolean ignoreMissing) {
         super(tag);
         this.field = field;
         this.processor = processor;
         this.ignoreMissing = ignoreMissing;
-        this.genericExecutor = genericExecutor;
     }
 
     boolean isIgnoreMissing() {
@@ -79,41 +75,35 @@ public final class ForEachProcessor extends AbstractProcessor implements Wrappin
                 handler.accept(null, new IllegalArgumentException("field [" + field + "] is null, cannot loop over its elements."));
             }
         } else {
-            List<Object> newValues = new CopyOnWriteArrayList<>();
-            innerExecute(0, values, newValues, ingestDocument, handler);
+            innerExecute(0, values, new ArrayList<>(values.size()), ingestDocument, handler);
         }
     }
 
     void innerExecute(int index, List<?> values, List<Object> newValues, IngestDocument document,
                       BiConsumer<IngestDocument, Exception> handler) {
+        for (; index < values.size(); index++) {
+            AtomicBoolean shouldContinueHere = new AtomicBoolean();
+            Object value = values.get(index);
+            Object previousValue = document.getIngestMetadata().put("_value", value);
+            int nextIndex = index + 1;
+            processor.execute(document, (result, e) -> {
+                newValues.add(document.getIngestMetadata().put("_value", previousValue));
+                if (e != null || result == null) {
+                    handler.accept(result, e);
+                } else if (shouldContinueHere.getAndSet(true)) {
+                    innerExecute(nextIndex, values, newValues, document, handler);
+                }
+            });
+
+            if (shouldContinueHere.getAndSet(true) == false) {
+                return;
+            }
+        }
+
         if (index == values.size()) {
             document.setFieldValue(field, new ArrayList<>(newValues));
             handler.accept(document, null);
-            return;
         }
-
-        Object value = values.get(index);
-        Object previousValue = document.getIngestMetadata().put("_value", value);
-        final Thread thread = Thread.currentThread();
-        processor.execute(document, (result, e) -> {
-            if (e != null)  {
-                newValues.add(document.getIngestMetadata().put("_value", previousValue));
-                handler.accept(null, e);
-            } else if (result == null) {
-                handler.accept(null, null);
-            } else {
-                newValues.add(document.getIngestMetadata().put("_value", previousValue));
-                if (thread == Thread.currentThread() && (index + 1) % MAX_RECURSE_PER_THREAD == 0) {
-                    // we are on the same thread and we need to fork to another thread to avoid recursive stack overflow on a single thread
-                    // only fork after 10 recursive calls, then fork every 10 to keep the number of threads down
-                    genericExecutor.accept(() -> innerExecute(index + 1, values, newValues, document, handler));
-                } else {
-                    // we are on a different thread (we went asynchronous), it's safe to recurse
-                    // or we have recursed less then 10 times with the same thread, it's safe to recurse
-                    innerExecute(index + 1, values, newValues, document, handler);
-                }
-            }
-        });
     }
 
     @Override
@@ -137,11 +127,9 @@ public final class ForEachProcessor extends AbstractProcessor implements Wrappin
     public static final class Factory implements Processor.Factory {
 
         private final ScriptService scriptService;
-        private final Consumer<Runnable> genericExecutor;
 
-        Factory(ScriptService scriptService, Consumer<Runnable> genericExecutor) {
+        Factory(ScriptService scriptService) {
             this.scriptService = scriptService;
-            this.genericExecutor = genericExecutor;
         }
 
         @Override
@@ -157,7 +145,7 @@ public final class ForEachProcessor extends AbstractProcessor implements Wrappin
             Map.Entry<String, Map<String, Object>> entry = entries.iterator().next();
             Processor processor =
                 ConfigurationUtils.readProcessor(factories, scriptService, entry.getKey(), entry.getValue());
-            return new ForEachProcessor(tag, field, processor, ignoreMissing, genericExecutor);
+            return new ForEachProcessor(tag, field, processor, ignoreMissing);
         }
     }
 }

+ 1 - 1
modules/ingest-common/src/main/java/org/elasticsearch/ingest/common/IngestCommonPlugin.java

@@ -75,7 +75,7 @@ public class IngestCommonPlugin extends Plugin implements ActionPlugin, IngestPl
                 entry(ConvertProcessor.TYPE, new ConvertProcessor.Factory()),
                 entry(GsubProcessor.TYPE, new GsubProcessor.Factory()),
                 entry(FailProcessor.TYPE, new FailProcessor.Factory(parameters.scriptService)),
-                entry(ForEachProcessor.TYPE, new ForEachProcessor.Factory(parameters.scriptService, parameters.genericExecutor)),
+                entry(ForEachProcessor.TYPE, new ForEachProcessor.Factory(parameters.scriptService)),
                 entry(DateIndexNameProcessor.TYPE, new DateIndexNameProcessor.Factory(parameters.scriptService)),
                 entry(SortProcessor.TYPE, new SortProcessor.Factory()),
                 entry(GrokProcessor.TYPE, new GrokProcessor.Factory(GROK_PATTERNS, createGrokThreadWatchdog(parameters))),

+ 6 - 6
modules/ingest-common/src/test/java/org/elasticsearch/ingest/common/ForEachProcessorFactoryTests.java

@@ -43,7 +43,7 @@ public class ForEachProcessorFactoryTests extends ESTestCase {
         Processor processor = new TestProcessor(ingestDocument -> { });
         Map<String, Processor.Factory> registry = new HashMap<>();
         registry.put("_name", (r, t, c) -> processor);
-        ForEachProcessor.Factory forEachFactory = new ForEachProcessor.Factory(scriptService, genericExecutor);
+        ForEachProcessor.Factory forEachFactory = new ForEachProcessor.Factory(scriptService);
 
         Map<String, Object> config = new HashMap<>();
         config.put("field", "_field");
@@ -59,7 +59,7 @@ public class ForEachProcessorFactoryTests extends ESTestCase {
         Processor processor = new TestProcessor(ingestDocument -> { });
         Map<String, Processor.Factory> registry = new HashMap<>();
         registry.put("_name", (r, t, c) -> processor);
-        ForEachProcessor.Factory forEachFactory = new ForEachProcessor.Factory(scriptService, genericExecutor);
+        ForEachProcessor.Factory forEachFactory = new ForEachProcessor.Factory(scriptService);
 
         Map<String, Object> config = new HashMap<>();
         config.put("field", "_field");
@@ -77,7 +77,7 @@ public class ForEachProcessorFactoryTests extends ESTestCase {
         Map<String, Processor.Factory> registry = new HashMap<>();
         registry.put("_first", (r, t, c) -> processor);
         registry.put("_second", (r, t, c) -> processor);
-        ForEachProcessor.Factory forEachFactory = new ForEachProcessor.Factory(scriptService, genericExecutor);
+        ForEachProcessor.Factory forEachFactory = new ForEachProcessor.Factory(scriptService);
 
         Map<String, Object> config = new HashMap<>();
         config.put("field", "_field");
@@ -90,7 +90,7 @@ public class ForEachProcessorFactoryTests extends ESTestCase {
     }
 
     public void testCreateWithNonExistingProcessorType() throws Exception {
-        ForEachProcessor.Factory forEachFactory = new ForEachProcessor.Factory(scriptService, genericExecutor);
+        ForEachProcessor.Factory forEachFactory = new ForEachProcessor.Factory(scriptService);
         Map<String, Object> config = new HashMap<>();
         config.put("field", "_field");
         config.put("processor", Collections.singletonMap("_name", Collections.emptyMap()));
@@ -103,7 +103,7 @@ public class ForEachProcessorFactoryTests extends ESTestCase {
         Processor processor = new TestProcessor(ingestDocument -> { });
         Map<String, Processor.Factory> registry = new HashMap<>();
         registry.put("_name", (r, t, c) -> processor);
-        ForEachProcessor.Factory forEachFactory = new ForEachProcessor.Factory(scriptService, genericExecutor);
+        ForEachProcessor.Factory forEachFactory = new ForEachProcessor.Factory(scriptService);
         Map<String, Object> config = new HashMap<>();
         config.put("processor", Collections.singletonList(Collections.singletonMap("_name", Collections.emptyMap())));
         Exception exception = expectThrows(Exception.class, () -> forEachFactory.create(registry, null, config));
@@ -111,7 +111,7 @@ public class ForEachProcessorFactoryTests extends ESTestCase {
     }
 
     public void testCreateWithMissingProcessor() {
-        ForEachProcessor.Factory forEachFactory = new ForEachProcessor.Factory(scriptService, genericExecutor);
+        ForEachProcessor.Factory forEachFactory = new ForEachProcessor.Factory(scriptService);
         Map<String, Object> config = new HashMap<>();
         config.put("field", "_field");
         Exception exception = expectThrows(Exception.class, () -> forEachFactory.create(Collections.emptyMap(), null, config));

+ 17 - 98
modules/ingest-common/src/test/java/org/elasticsearch/ingest/common/ForEachProcessorTests.java

@@ -19,9 +19,6 @@
 
 package org.elasticsearch.ingest.common;
 
-import org.elasticsearch.common.settings.Settings;
-import org.elasticsearch.common.util.concurrent.EsExecutors;
-import org.elasticsearch.common.util.concurrent.ThreadContext;
 import org.elasticsearch.ingest.CompoundProcessor;
 import org.elasticsearch.ingest.IngestDocument;
 import org.elasticsearch.ingest.Processor;
@@ -29,7 +26,6 @@ import org.elasticsearch.ingest.TestProcessor;
 import org.elasticsearch.ingest.TestTemplateService;
 import org.elasticsearch.script.TemplateScript;
 import org.elasticsearch.test.ESTestCase;
-import org.junit.Before;
 
 import java.util.ArrayList;
 import java.util.Arrays;
@@ -38,74 +34,15 @@ import java.util.HashMap;
 import java.util.List;
 import java.util.Locale;
 import java.util.Map;
-import java.util.concurrent.ExecutorService;
-import java.util.concurrent.ThreadPoolExecutor;
-import java.util.concurrent.TimeUnit;
 import java.util.function.BiConsumer;
-import java.util.function.Consumer;
+import java.util.stream.Collectors;
 import java.util.stream.IntStream;
 
 import static org.elasticsearch.ingest.IngestDocumentMatcher.assertIngestDocument;
 import static org.hamcrest.Matchers.equalTo;
-import static org.mockito.Matchers.any;
-import static org.mockito.Mockito.doAnswer;
-import static org.mockito.Mockito.mock;
-import static org.mockito.Mockito.times;
-import static org.mockito.Mockito.verify;
-import static org.mockito.Mockito.verifyZeroInteractions;
 
 public class ForEachProcessorTests extends ESTestCase {
 
-    @SuppressWarnings("unchecked")
-    private Consumer<Runnable> genericExecutor = (Consumer<Runnable>) mock(Consumer.class);
-    private final ExecutorService direct = EsExecutors.newDirectExecutorService();
-
-    @Before
-    public void setup() {
-        //execute runnable on same thread for simplicity. some tests will override this and actually run async
-        doAnswer(invocationOnMock -> {
-            direct.execute((Runnable) invocationOnMock.getArguments()[0]);
-            return null;
-        }).when(genericExecutor).accept(any(Runnable.class));
-    }
-
-    public void testExecute() throws Exception {
-        ThreadPoolExecutor asyncExecutor =
-            EsExecutors.newScaling(getClass().getName() + "/" + getTestName(), between(1, 2), between(3, 4), 10, TimeUnit.SECONDS,
-                EsExecutors.daemonThreadFactory("test"), new ThreadContext(Settings.EMPTY));
-        doAnswer(invocationOnMock -> {
-            asyncExecutor.execute((Runnable) invocationOnMock.getArguments()[0]);
-            return null;
-        }).when(genericExecutor).accept(any(Runnable.class));
-
-        List<String> values = new ArrayList<>();
-        values.add("foo");
-        values.add("bar");
-        values.add("baz");
-        IntStream.range(0, ForEachProcessor.MAX_RECURSE_PER_THREAD).forEach(value -> values.add("a"));
-        IngestDocument ingestDocument = new IngestDocument(
-            "_index", "_id", null, null, null, Collections.singletonMap("values", values)
-        );
-
-        ForEachProcessor processor = new ForEachProcessor(
-            "_tag", "values", new UppercaseProcessor("_tag", "_ingest._value", false, "_ingest._value"),
-            false, genericExecutor
-        );
-        processor.execute(ingestDocument, (result, e) -> {});
-
-        assertBusy(() -> assertEquals(values.size() / ForEachProcessor.MAX_RECURSE_PER_THREAD, asyncExecutor.getCompletedTaskCount()));
-        asyncExecutor.shutdown();
-        asyncExecutor.awaitTermination(5, TimeUnit.SECONDS);
-
-        @SuppressWarnings("unchecked")
-        List<String> result = ingestDocument.getFieldValue("values", List.class);
-        assertThat(result.get(0), equalTo("FOO"));
-        assertThat(result.get(1), equalTo("BAR"));
-        assertThat(result.get(2), equalTo("BAZ"));
-        IntStream.range(3, ForEachProcessor.MAX_RECURSE_PER_THREAD + 3).forEach(i -> assertThat(result.get(i), equalTo("A")));
-        verify(genericExecutor, times(values.size() / ForEachProcessor.MAX_RECURSE_PER_THREAD)).accept(any(Runnable.class));
-    }
-
     public void testExecuteWithAsyncProcessor() throws Exception {
         List<String> values = new ArrayList<>();
         values.add("foo");
@@ -116,7 +53,7 @@ public class ForEachProcessorTests extends ESTestCase {
         );
 
         ForEachProcessor processor = new ForEachProcessor("_tag", "values", new AsyncUpperCaseProcessor("_ingest._value"),
-            false, genericExecutor);
+            false);
         processor.execute(ingestDocument, (result, e) -> {
         });
 
@@ -128,8 +65,6 @@ public class ForEachProcessorTests extends ESTestCase {
             assertThat(result.get(1), equalTo("BAR"));
             assertThat(result.get(2), equalTo("BAZ"));
         });
-
-        verifyZeroInteractions(genericExecutor);
     }
 
     public void testExecuteWithFailure() throws Exception {
@@ -142,7 +77,7 @@ public class ForEachProcessorTests extends ESTestCase {
                 throw new RuntimeException("failure");
             }
         });
-        ForEachProcessor processor = new ForEachProcessor("_tag", "values", testProcessor, false, genericExecutor);
+        ForEachProcessor processor = new ForEachProcessor("_tag", "values", testProcessor, false);
         Exception[] exceptions = new Exception[1];
         processor.execute(ingestDocument, (result, e) -> {exceptions[0] = e;});
         assertThat(exceptions[0].getMessage(), equalTo("failure"));
@@ -160,7 +95,7 @@ public class ForEachProcessorTests extends ESTestCase {
         Processor onFailureProcessor = new TestProcessor(ingestDocument1 -> {});
         processor = new ForEachProcessor(
             "_tag", "values", new CompoundProcessor(false, Arrays.asList(testProcessor), Arrays.asList(onFailureProcessor)),
-            false, genericExecutor
+            false
         );
         processor.execute(ingestDocument, (result, e) -> {});
         assertThat(testProcessor.getInvokedCounter(), equalTo(3));
@@ -179,7 +114,7 @@ public class ForEachProcessorTests extends ESTestCase {
             id.setFieldValue("_ingest._value.index", id.getSourceAndMetadata().get("_index"));
             id.setFieldValue("_ingest._value.id", id.getSourceAndMetadata().get("_id"));
         });
-        ForEachProcessor processor = new ForEachProcessor("_tag", "values", innerProcessor, false, genericExecutor);
+        ForEachProcessor processor = new ForEachProcessor("_tag", "values", innerProcessor, false);
         processor.execute(ingestDocument, (result, e) -> {});
 
         assertThat(innerProcessor.getInvokedCounter(), equalTo(2));
@@ -205,7 +140,7 @@ public class ForEachProcessorTests extends ESTestCase {
         ForEachProcessor processor = new ForEachProcessor(
             "_tag", "values", new SetProcessor("_tag",
             new TestTemplateService.MockTemplateScript.Factory("_ingest._value.new_field"),
-            (model) -> model.get("other")), false, genericExecutor);
+            (model) -> model.get("other")), false);
         processor.execute(ingestDocument, (result, e) -> {});
 
         assertThat(ingestDocument.getFieldValue("values.0.new_field", String.class), equalTo("value"));
@@ -215,17 +150,10 @@ public class ForEachProcessorTests extends ESTestCase {
         assertThat(ingestDocument.getFieldValue("values.4.new_field", String.class), equalTo("value"));
     }
 
-    public void testRandom() throws Exception {
-        ThreadPoolExecutor asyncExecutor =
-            EsExecutors.newScaling(getClass().getName() + "/" + getTestName(), between(1, 2), between(3, 4), 10, TimeUnit.SECONDS,
-                EsExecutors.daemonThreadFactory("test"), new ThreadContext(Settings.EMPTY));
-        doAnswer(invocationOnMock -> {
-            asyncExecutor.execute((Runnable) invocationOnMock.getArguments()[0]);
-            return null;
-        }).when(genericExecutor).accept(any(Runnable.class));
+    public void testRandom() {
         Processor innerProcessor = new Processor() {
                 @Override
-                public IngestDocument execute(IngestDocument ingestDocument) throws Exception {
+                public IngestDocument execute(IngestDocument ingestDocument) {
                     String existingValue = ingestDocument.getFieldValue("_ingest._value", String.class);
                     ingestDocument.setFieldValue("_ingest._value", existingValue + ".");
                     return ingestDocument;
@@ -242,28 +170,19 @@ public class ForEachProcessorTests extends ESTestCase {
                 }
         };
         int numValues = randomIntBetween(1, 10000);
-        List<String> values = new ArrayList<>(numValues);
-        for (int i = 0; i < numValues; i++) {
-            values.add("");
-        }
+        List<String> values = IntStream.range(0, numValues).mapToObj(i->"").collect(Collectors.toList());
+
         IngestDocument ingestDocument = new IngestDocument(
             "_index", "_id", null, null, null, Collections.singletonMap("values", values)
         );
 
-        ForEachProcessor processor = new ForEachProcessor("_tag", "values", innerProcessor, false, genericExecutor);
+        ForEachProcessor processor = new ForEachProcessor("_tag", "values", innerProcessor, false);
         processor.execute(ingestDocument, (result, e) -> {});
 
-        assertBusy(() -> assertEquals(values.size() / ForEachProcessor.MAX_RECURSE_PER_THREAD, asyncExecutor.getCompletedTaskCount()));
-        asyncExecutor.shutdown();
-        asyncExecutor.awaitTermination(5, TimeUnit.SECONDS);
-
         @SuppressWarnings("unchecked")
         List<String> result = ingestDocument.getFieldValue("values", List.class);
         assertThat(result.size(), equalTo(numValues));
-        for (String r : result) {
-            assertThat(r, equalTo("."));
-        }
-        verify(genericExecutor, times(values.size() / ForEachProcessor.MAX_RECURSE_PER_THREAD)).accept(any(Runnable.class));
+        result.forEach(r -> assertThat(r, equalTo(".")));
     }
 
     public void testModifyFieldsOutsideArray() throws Exception {
@@ -281,7 +200,7 @@ public class ForEachProcessorTests extends ESTestCase {
                 "_tag", "values", new CompoundProcessor(false,
                 Collections.singletonList(new UppercaseProcessor("_tag_upper", "_ingest._value", false, "_ingest._value")),
                 Collections.singletonList(new AppendProcessor("_tag", template, (model) -> (Collections.singletonList("added"))))
-        ), false, genericExecutor);
+        ), false);
         processor.execute(ingestDocument, (result, e) -> {});
 
         List<?> result = ingestDocument.getFieldValue("values", List.class);
@@ -307,7 +226,7 @@ public class ForEachProcessorTests extends ESTestCase {
 
         TestProcessor processor = new TestProcessor(doc -> doc.setFieldValue("_ingest._value",
                 doc.getFieldValue("_source._value", String.class)));
-        ForEachProcessor forEachProcessor = new ForEachProcessor("_tag", "values", processor, false, genericExecutor);
+        ForEachProcessor forEachProcessor = new ForEachProcessor("_tag", "values", processor, false);
         forEachProcessor.execute(ingestDocument, (result, e) -> {});
 
         List<?> result = ingestDocument.getFieldValue("values", List.class);
@@ -340,8 +259,8 @@ public class ForEachProcessorTests extends ESTestCase {
                 doc -> doc.setFieldValue("_ingest._value", doc.getFieldValue("_ingest._value", String.class).toUpperCase(Locale.ENGLISH))
         );
         ForEachProcessor processor = new ForEachProcessor(
-                "_tag", "values1", new ForEachProcessor("_tag", "_ingest._value.values2", testProcessor, false, genericExecutor),
-            false, genericExecutor);
+                "_tag", "values1", new ForEachProcessor("_tag", "_ingest._value.values2", testProcessor, false),
+            false);
         processor.execute(ingestDocument, (result, e) -> {});
 
         List<?> result = ingestDocument.getFieldValue("values1.0.values2", List.class);
@@ -359,7 +278,7 @@ public class ForEachProcessorTests extends ESTestCase {
         );
         IngestDocument ingestDocument = new IngestDocument(originalIngestDocument);
         TestProcessor testProcessor = new TestProcessor(doc -> {});
-        ForEachProcessor processor = new ForEachProcessor("_tag", "_ingest._value", testProcessor, true, genericExecutor);
+        ForEachProcessor processor = new ForEachProcessor("_tag", "_ingest._value", testProcessor, true);
         processor.execute(ingestDocument, (result, e) -> {});
         assertIngestDocument(originalIngestDocument, ingestDocument);
         assertThat(testProcessor.getInvokedCounter(), equalTo(0));