浏览代码

Refactor IngestDocument reroute recursion detection (#95350)

Joe Gallo 2 年之前
父节点
当前提交
89770c5b69

+ 42 - 5
server/src/main/java/org/elasticsearch/ingest/IngestDocument.java

@@ -62,6 +62,19 @@ public final class IngestDocument {
 
     // Contains all pipelines that have been executed for this document
     private final Set<String> executedPipelines = new LinkedHashSet<>();
+
+    /**
+     * An ordered set of the values of the _index that have been used for this document.
+     * <p>
+     * IMPORTANT: This is only updated after a top-level pipeline has run (see {@code IngestService#executePipelines(...)}).
+     * <p>
+     * For example, if a processor changes the _index for a document from 'foo' to 'bar',
+     * and then another processor changes the value back to 'foo', then the overall effect
+     * of the pipeline was that the _index value did not change and so only 'foo' would appear
+     * in the index history.
+     */
+    private Set<String> indexHistory = new LinkedHashSet<>();
+
     private boolean doNoSelfReferencesCheck = false;
     private boolean reroute = false;
 
@@ -70,21 +83,27 @@ public final class IngestDocument {
         this.ingestMetadata = new HashMap<>();
         this.ingestMetadata.put(TIMESTAMP, ctxMap.getMetadata().getNow());
         this.templateModel = initializeTemplateModel();
+
+        // initialize the index history by putting the current index into it
+        this.indexHistory.add(index);
     }
 
+    // note: these rest of these constructors deal with the data-centric view of the IngestDocument, not the execution-centric view.
+    // For example, the copy constructor doesn't populate the `executedPipelines` or `indexHistory` (as well as some other fields),
+    // because those fields are execution-centric.
+
     /**
-     * Copy constructor that creates a new {@link IngestDocument} which has exactly the same properties as the one provided as argument
+     * Copy constructor that creates a new {@link IngestDocument} which has exactly the same properties as the one provided.
      */
     public IngestDocument(IngestDocument other) {
         this(
             new IngestCtxMap(deepCopyMap(other.ctxMap.getSource()), other.ctxMap.getMetadata().clone()),
             deepCopyMap(other.ingestMetadata)
         );
-        this.reroute = other.reroute;
     }
 
     /**
-     * Constructor to create an IngestDocument from its constituent maps.  The maps are shallow copied.
+     * Constructor to create an IngestDocument from its constituent maps. The maps are shallow copied.
      */
     public IngestDocument(Map<String, Object> sourceAndMetadata, Map<String, Object> ingestMetadata) {
         Map<String, Object> source;
@@ -107,7 +126,7 @@ public final class IngestDocument {
     }
 
     /**
-     * Constructor to create an IngestDocument from its constituent maps
+     * Constructor to create an IngestDocument from its constituent maps.
      */
     IngestDocument(IngestCtxMap ctxMap, Map<String, Object> ingestMetadata) {
         this.ctxMap = Objects.requireNonNull(ctxMap);
@@ -841,6 +860,24 @@ public final class IngestDocument {
         return pipelineStack;
     }
 
+    /**
+     * Adds an index to the index history for this document, returning true if the index
+     * was added to the index history (i.e. if it wasn't already in the index history).
+     *
+     * @param index the index to potentially add to the index history
+     * @return true if the index history did not already contain the index in question
+     */
+    public boolean updateIndexHistory(String index) {
+        return indexHistory.add(index);
+    }
+
+    /**
+     * @return an unmodifiable view of the document's index history
+     */
+    public Set<String> getIndexHistory() {
+        return Collections.unmodifiableSet(indexHistory);
+    }
+
     /**
      * @return Whether a self referencing check should be performed
      */
@@ -990,7 +1027,7 @@ public final class IngestDocument {
     /**
      * Provides a shallowly read-only, very limited, map-like view of two maps. The only methods that are implemented are
      * {@link Map#get(Object)} and {@link Map#containsKey(Object)}, everything else throws UnsupportedOperationException.
-     *
+     * <p>
      * The overrides map has higher priority than the primary map -- values in that map under some key will take priority over values
      * in the primary map under the same key.
      *

+ 9 - 12
server/src/main/java/org/elasticsearch/ingest/IngestService.java

@@ -69,7 +69,6 @@ import java.util.Collections;
 import java.util.HashMap;
 import java.util.HashSet;
 import java.util.Iterator;
-import java.util.LinkedHashSet;
 import java.util.LinkedList;
 import java.util.List;
 import java.util.Locale;
@@ -688,9 +687,7 @@ public class IngestService implements ClusterStateApplier, ReportingService<Inge
                         });
 
                         IngestDocument ingestDocument = newIngestDocument(indexRequest);
-                        LinkedHashSet<String> indexRecursionDetection = new LinkedHashSet<>();
-                        indexRecursionDetection.add(indexRequest.index());
-                        executePipelines(pipelines, indexRequest, ingestDocument, documentListener, indexRecursionDetection);
+                        executePipelines(pipelines, indexRequest, ingestDocument, documentListener);
                         i++;
                     }
                 }
@@ -774,8 +771,7 @@ public class IngestService implements ClusterStateApplier, ReportingService<Inge
         final PipelineIterator pipelines,
         final IndexRequest indexRequest,
         final IngestDocument ingestDocument,
-        final ActionListener<Boolean> listener,
-        final Set<String> indexRecursionDetection
+        final ActionListener<Boolean> listener
     ) {
         assert pipelines.hasNext();
         PipelineSlot slot = pipelines.next();
@@ -859,17 +855,18 @@ public class IngestService implements ClusterStateApplier, ReportingService<Inge
                         return; // document failed!
                     }
 
-                    // check for cycles in the visited indices
-                    if (indexRecursionDetection.add(newIndex) == false) {
-                        List<String> indexRoute = new ArrayList<>(indexRecursionDetection);
-                        indexRoute.add(newIndex);
+                    // add the index to the document's index history, and check for cycles in the visited indices
+                    boolean cycle = ingestDocument.updateIndexHistory(newIndex) == false;
+                    if (cycle) {
+                        List<String> indexCycle = new ArrayList<>(ingestDocument.getIndexHistory());
+                        indexCycle.add(newIndex);
                         listener.onFailure(
                             new IllegalStateException(
                                 format(
                                     "index cycle detected while processing pipeline [%s] for document [%s]: %s",
                                     pipelineId,
                                     indexRequest.id(),
-                                    indexRoute
+                                    indexCycle
                                 )
                             )
                         );
@@ -890,7 +887,7 @@ public class IngestService implements ClusterStateApplier, ReportingService<Inge
                 }
 
                 if (newPipelines.hasNext()) {
-                    executePipelines(newPipelines, indexRequest, ingestDocument, listener, indexRecursionDetection);
+                    executePipelines(newPipelines, indexRequest, ingestDocument, listener);
                 } else {
                     // update the index request's source and (potentially) cache the timestamp for TSDB
                     updateIndexRequestSource(indexRequest, ingestDocument);

+ 16 - 0
server/src/test/java/org/elasticsearch/ingest/IngestDocumentTests.java

@@ -10,6 +10,7 @@ package org.elasticsearch.ingest;
 
 import org.elasticsearch.core.Tuple;
 import org.elasticsearch.test.ESTestCase;
+import org.hamcrest.Matchers;
 import org.junit.Before;
 
 import java.time.Instant;
@@ -1142,4 +1143,19 @@ public class IngestDocumentTests extends ESTestCase {
         assertFalse(IngestDocument.Metadata.isMetadata("address"));
     }
 
+    public void testIndexHistory() {
+        // the index history contains the original index
+        String index1 = ingestDocument.getFieldValue("_index", String.class);
+        assertThat(index1, equalTo("index"));
+        assertThat(ingestDocument.getIndexHistory(), Matchers.contains(index1));
+
+        // it can be updated to include another index
+        String index2 = "another_index";
+        assertTrue(ingestDocument.updateIndexHistory(index2));
+        assertThat(ingestDocument.getIndexHistory(), Matchers.contains(index1, index2));
+
+        // an index cycle cannot be introduced, however
+        assertFalse(ingestDocument.updateIndexHistory(index1));
+        assertThat(ingestDocument.getIndexHistory(), Matchers.contains(index1, index2));
+    }
 }