瀏覽代碼

Correctly handling download_database_on_pipeline_creation within a pipeline processor within a default or final pipeline (#131236) (#131649) (#131653)

Co-authored-by: Keith Massey <keith.massey@elastic.co>
Joe Gallo 2 月之前
父節點
當前提交
ab2b6b93b9

+ 6 - 0
docs/changelog/131236.yaml

@@ -0,0 +1,6 @@
+pr: 131236
+summary: Correctly handling `download_database_on_pipeline_creation` within a pipeline
+  processor within a default or final pipeline
+area: Ingest Node
+type: bug
+issues: []

+ 151 - 14
modules/ingest-geoip/src/main/java/org/elasticsearch/ingest/geoip/GeoIpDownloaderTaskExecutor.java

@@ -21,10 +21,12 @@ import org.elasticsearch.cluster.ClusterChangedEvent;
 import org.elasticsearch.cluster.ClusterState;
 import org.elasticsearch.cluster.ClusterStateListener;
 import org.elasticsearch.cluster.metadata.IndexAbstraction;
+import org.elasticsearch.cluster.metadata.IndexMetadata;
 import org.elasticsearch.cluster.node.DiscoveryNode;
 import org.elasticsearch.cluster.service.ClusterService;
 import org.elasticsearch.common.settings.Setting;
 import org.elasticsearch.common.settings.Settings;
+import org.elasticsearch.common.util.Maps;
 import org.elasticsearch.core.TimeValue;
 import org.elasticsearch.gateway.GatewayService;
 import org.elasticsearch.index.Index;
@@ -247,11 +249,14 @@ public final class GeoIpDownloaderTaskExecutor extends PersistentTasksExecutor<G
             return false;
         }
 
-        return clusterState.getMetadata().indices().values().stream().anyMatch(indexMetadata -> {
+        for (IndexMetadata indexMetadata : clusterState.getMetadata().indices().values()) {
             String defaultPipeline = IndexSettings.DEFAULT_PIPELINE.get(indexMetadata.getSettings());
             String finalPipeline = IndexSettings.FINAL_PIPELINE.get(indexMetadata.getSettings());
-            return checkReferencedPipelines.contains(defaultPipeline) || checkReferencedPipelines.contains(finalPipeline);
-        });
+            if (checkReferencedPipelines.contains(defaultPipeline) || checkReferencedPipelines.contains(finalPipeline)) {
+                return true;
+            }
+        }
+        return false;
     }
 
     /**
@@ -264,12 +269,26 @@ public final class GeoIpDownloaderTaskExecutor extends PersistentTasksExecutor<G
     @SuppressWarnings("unchecked")
     private static Set<String> pipelinesWithGeoIpProcessor(ClusterState clusterState, boolean downloadDatabaseOnPipelineCreation) {
         List<PipelineConfiguration> configurations = IngestService.getPipelines(clusterState);
+        Map<String, PipelineConfiguration> pipelineConfigById = Maps.newHashMapWithExpectedSize(configurations.size());
+        for (PipelineConfiguration configuration : configurations) {
+            pipelineConfigById.put(configuration.getId(), configuration);
+        }
+        // this map is used to keep track of pipelines that have already been checked
+        Map<String, Boolean> pipelineHasGeoProcessorById = Maps.newHashMapWithExpectedSize(configurations.size());
         Set<String> ids = new HashSet<>();
         // note: this loop is unrolled rather than streaming-style because it's hot enough to show up in a flamegraph
         for (PipelineConfiguration configuration : configurations) {
             List<Map<String, Object>> processors = (List<Map<String, Object>>) configuration.getConfig().get(Pipeline.PROCESSORS_KEY);
-            if (hasAtLeastOneGeoipProcessor(processors, downloadDatabaseOnPipelineCreation)) {
-                ids.add(configuration.getId());
+            String pipelineName = configuration.getId();
+            if (pipelineHasGeoProcessorById.containsKey(pipelineName) == false) {
+                if (hasAtLeastOneGeoipProcessor(
+                    processors,
+                    downloadDatabaseOnPipelineCreation,
+                    pipelineConfigById,
+                    pipelineHasGeoProcessorById
+                )) {
+                    ids.add(pipelineName);
+                }
             }
         }
         return Collections.unmodifiableSet(ids);
@@ -279,13 +298,27 @@ public final class GeoIpDownloaderTaskExecutor extends PersistentTasksExecutor<G
      * Check if a list of processor contains at least a geoip processor.
      * @param processors List of processors.
      * @param downloadDatabaseOnPipelineCreation Should the download_database_on_pipeline_creation of the geoip processor be true or false.
+     * @param pipelineConfigById A Map of pipeline id to PipelineConfiguration
+     * @param pipelineHasGeoProcessorById A Map of pipeline id to Boolean, indicating whether the pipeline references a geoip processor
+     *                                    (true), does not reference a geoip processor (false), or we are currently trying to figure that
+     *                                    out (null).
      * @return true if a geoip processor is found in the processor list.
      */
-    private static boolean hasAtLeastOneGeoipProcessor(List<Map<String, Object>> processors, boolean downloadDatabaseOnPipelineCreation) {
+    private static boolean hasAtLeastOneGeoipProcessor(
+        List<Map<String, Object>> processors,
+        boolean downloadDatabaseOnPipelineCreation,
+        Map<String, PipelineConfiguration> pipelineConfigById,
+        Map<String, Boolean> pipelineHasGeoProcessorById
+    ) {
         if (processors != null) {
             // note: this loop is unrolled rather than streaming-style because it's hot enough to show up in a flamegraph
             for (Map<String, Object> processor : processors) {
-                if (hasAtLeastOneGeoipProcessor(processor, downloadDatabaseOnPipelineCreation)) {
+                if (hasAtLeastOneGeoipProcessor(
+                    processor,
+                    downloadDatabaseOnPipelineCreation,
+                    pipelineConfigById,
+                    pipelineHasGeoProcessorById
+                )) {
                     return true;
                 }
             }
@@ -297,10 +330,19 @@ public final class GeoIpDownloaderTaskExecutor extends PersistentTasksExecutor<G
      * Check if a processor config is a geoip processor or contains at least a geoip processor.
      * @param processor Processor config.
      * @param downloadDatabaseOnPipelineCreation Should the download_database_on_pipeline_creation of the geoip processor be true or false.
+     * @param pipelineConfigById A Map of pipeline id to PipelineConfiguration
+     * @param pipelineHasGeoProcessorById A Map of pipeline id to Boolean, indicating whether the pipeline references a geoip processor
+     *                                    (true), does not reference a geoip processor (false), or we are currently trying to figure that
+     *                                    out (null).
      * @return true if a geoip processor is found in the processor list.
      */
     @SuppressWarnings("unchecked")
-    private static boolean hasAtLeastOneGeoipProcessor(Map<String, Object> processor, boolean downloadDatabaseOnPipelineCreation) {
+    private static boolean hasAtLeastOneGeoipProcessor(
+        Map<String, Object> processor,
+        boolean downloadDatabaseOnPipelineCreation,
+        Map<String, PipelineConfiguration> pipelineConfigById,
+        Map<String, Boolean> pipelineHasGeoProcessorById
+    ) {
         if (processor == null) {
             return false;
         }
@@ -319,27 +361,51 @@ public final class GeoIpDownloaderTaskExecutor extends PersistentTasksExecutor<G
             }
         }
 
-        return isProcessorWithOnFailureGeoIpProcessor(processor, downloadDatabaseOnPipelineCreation)
-            || isForeachProcessorWithGeoipProcessor(processor, downloadDatabaseOnPipelineCreation);
+        return isProcessorWithOnFailureGeoIpProcessor(
+            processor,
+            downloadDatabaseOnPipelineCreation,
+            pipelineConfigById,
+            pipelineHasGeoProcessorById
+        )
+            || isForeachProcessorWithGeoipProcessor(
+                processor,
+                downloadDatabaseOnPipelineCreation,
+                pipelineConfigById,
+                pipelineHasGeoProcessorById
+            )
+            || isPipelineProcessorWithGeoIpProcessor(
+                processor,
+                downloadDatabaseOnPipelineCreation,
+                pipelineConfigById,
+                pipelineHasGeoProcessorById
+            );
     }
 
     /**
      * Check if a processor config has an on_failure clause containing at least a geoip processor.
      * @param processor Processor config.
      * @param downloadDatabaseOnPipelineCreation Should the download_database_on_pipeline_creation of the geoip processor be true or false.
+     * @param pipelineConfigById A Map of pipeline id to PipelineConfiguration
+     * @param pipelineHasGeoProcessorById A Map of pipeline id to Boolean, indicating whether the pipeline references a geoip processor
+     *                                    (true), does not reference a geoip processor (false), or we are currently trying to figure that
+     *                                    out (null).
      * @return true if a geoip processor is found in the processor list.
      */
     @SuppressWarnings("unchecked")
     private static boolean isProcessorWithOnFailureGeoIpProcessor(
         Map<String, Object> processor,
-        boolean downloadDatabaseOnPipelineCreation
+        boolean downloadDatabaseOnPipelineCreation,
+        Map<String, PipelineConfiguration> pipelineConfigById,
+        Map<String, Boolean> pipelineHasGeoProcessorById
     ) {
         // note: this loop is unrolled rather than streaming-style because it's hot enough to show up in a flamegraph
         for (Object value : processor.values()) {
             if (value instanceof Map
                 && hasAtLeastOneGeoipProcessor(
                     ((Map<String, List<Map<String, Object>>>) value).get("on_failure"),
-                    downloadDatabaseOnPipelineCreation
+                    downloadDatabaseOnPipelineCreation,
+                    pipelineConfigById,
+                    pipelineHasGeoProcessorById
                 )) {
                 return true;
             }
@@ -351,13 +417,84 @@ public final class GeoIpDownloaderTaskExecutor extends PersistentTasksExecutor<G
      * Check if a processor is a foreach processor containing at least a geoip processor.
      * @param processor Processor config.
      * @param downloadDatabaseOnPipelineCreation Should the download_database_on_pipeline_creation of the geoip processor be true or false.
+     * @param pipelineConfigById A Map of pipeline id to PipelineConfiguration
+     * @param pipelineHasGeoProcessorById A Map of pipeline id to Boolean, indicating whether the pipeline references a geoip processor
+     *                                    (true), does not reference a geoip processor (false), or we are currently trying to figure that
+     *                                    out (null).
      * @return true if a geoip processor is found in the processor list.
      */
     @SuppressWarnings("unchecked")
-    private static boolean isForeachProcessorWithGeoipProcessor(Map<String, Object> processor, boolean downloadDatabaseOnPipelineCreation) {
+    private static boolean isForeachProcessorWithGeoipProcessor(
+        Map<String, Object> processor,
+        boolean downloadDatabaseOnPipelineCreation,
+        Map<String, PipelineConfiguration> pipelineConfigById,
+        Map<String, Boolean> pipelineHasGeoProcessorById
+    ) {
         final Map<String, Object> processorConfig = (Map<String, Object>) processor.get("foreach");
         return processorConfig != null
-            && hasAtLeastOneGeoipProcessor((Map<String, Object>) processorConfig.get("processor"), downloadDatabaseOnPipelineCreation);
+            && hasAtLeastOneGeoipProcessor(
+                (Map<String, Object>) processorConfig.get("processor"),
+                downloadDatabaseOnPipelineCreation,
+                pipelineConfigById,
+                pipelineHasGeoProcessorById
+            );
+    }
+
+    /**
+     * Check if a processor is a pipeline processor containing at least a geoip processor. This method also updates
+     * pipelineHasGeoProcessorById with a result for any pipelines it looks at.
+     * @param processor Processor config.
+     * @param downloadDatabaseOnPipelineCreation Should the download_database_on_pipeline_creation of the geoip processor be true or false.
+     * @param pipelineConfigById A Map of pipeline id to PipelineConfiguration
+     * @param pipelineHasGeoProcessorById A Map of pipeline id to Boolean, indicating whether the pipeline references a geoip processor
+     *                                    (true), does not reference a geoip processor (false), or we are currently trying to figure that
+     *                                    out (null).
+     * @return true if a geoip processor is found in the processors of this processor if this processor is a pipeline processor.
+     */
+    @SuppressWarnings("unchecked")
+    private static boolean isPipelineProcessorWithGeoIpProcessor(
+        Map<String, Object> processor,
+        boolean downloadDatabaseOnPipelineCreation,
+        Map<String, PipelineConfiguration> pipelineConfigById,
+        Map<String, Boolean> pipelineHasGeoProcessorById
+    ) {
+        final Map<String, Object> processorConfig = (Map<String, Object>) processor.get("pipeline");
+        if (processorConfig != null) {
+            String pipelineName = (String) processorConfig.get("name");
+            if (pipelineName != null) {
+                if (pipelineHasGeoProcessorById.containsKey(pipelineName)) {
+                    if (pipelineHasGeoProcessorById.get(pipelineName) == null) {
+                        /*
+                         * If the value is null here, it indicates that this method has been called recursively with the same pipeline name.
+                         * This will cause a runtime error when the pipeline is executed, but we're avoiding changing existing behavior at
+                         * server startup time. Instead, we just bail out as quickly as possible. It is possible that this could lead to a
+                         * geo database not being downloaded for the pipeline, but it doesn't really matter since the pipeline was going to
+                         * fail anyway.
+                         */
+                        pipelineHasGeoProcessorById.put(pipelineName, false);
+                    }
+                } else {
+                    List<Map<String, Object>> childProcessors = null;
+                    PipelineConfiguration config = pipelineConfigById.get(pipelineName);
+                    if (config != null) {
+                        childProcessors = (List<Map<String, Object>>) config.getConfig().get(Pipeline.PROCESSORS_KEY);
+                    }
+                    // We initialize this to null so that we know it's in progress and can use it to avoid stack overflow errors:
+                    pipelineHasGeoProcessorById.put(pipelineName, null);
+                    pipelineHasGeoProcessorById.put(
+                        pipelineName,
+                        hasAtLeastOneGeoipProcessor(
+                            childProcessors,
+                            downloadDatabaseOnPipelineCreation,
+                            pipelineConfigById,
+                            pipelineHasGeoProcessorById
+                        )
+                    );
+                }
+                return pipelineHasGeoProcessorById.get(pipelineName);
+            }
+        }
+        return false;
     }
 
     private static final TimeValue MASTER_TIMEOUT = TimeValue.MAX_VALUE;

+ 107 - 0
modules/ingest-geoip/src/test/java/org/elasticsearch/ingest/geoip/GeoIpDownloaderTaskExecutorTests.java

@@ -9,6 +9,7 @@
 
 package org.elasticsearch.ingest.geoip;
 
+import org.elasticsearch.cluster.ClusterName;
 import org.elasticsearch.cluster.ClusterState;
 import org.elasticsearch.cluster.metadata.IndexMetadata;
 import org.elasticsearch.cluster.metadata.Metadata;
@@ -16,6 +17,7 @@ import org.elasticsearch.common.Strings;
 import org.elasticsearch.common.bytes.BytesArray;
 import org.elasticsearch.common.settings.Settings;
 import org.elasticsearch.index.IndexSettings;
+import org.elasticsearch.index.IndexVersion;
 import org.elasticsearch.ingest.IngestMetadata;
 import org.elasticsearch.ingest.PipelineConfiguration;
 import org.elasticsearch.test.ESTestCase;
@@ -27,6 +29,7 @@ import java.io.IOException;
 import java.util.HashMap;
 import java.util.List;
 import java.util.Map;
+import java.util.function.Consumer;
 
 import static org.mockito.Mockito.mock;
 import static org.mockito.Mockito.when;
@@ -65,6 +68,101 @@ public class GeoIpDownloaderTaskExecutorTests extends ESTestCase {
 
     }
 
+    /*
+     * This tests that if a default or final pipeline has a pipeline processor that has a geoip processor that has
+     * download_database_on_pipeline_creation set to false, then we will correctly acknowledge that the pipeline has a geoip processor so
+     * that we download it appropriately.
+     */
+    public void testHasAtLeastOneGeoipProcessorInPipelineProcessorWhenDownloadDatabaseOnPipelineCreationIsFalse() throws IOException {
+        String innerInnerPipelineJson = """
+            {
+              "processors":[""" + getGeoIpProcessor(false) + """
+              ]
+            }
+            """;
+        String innerPipelineJson = """
+                        {
+              "processors":[{"pipeline": {"name": "innerInnerPipeline"}}
+              ]
+            }
+            """;
+        String outerPipelineJson = """
+                        {
+              "processors":[{"pipeline": {"name": "innerPipeline"}}
+              ]
+            }
+            """;
+        IngestMetadata ingestMetadata = new IngestMetadata(
+            Map.of(
+                "innerInnerPipeline",
+                new PipelineConfiguration("innerInnerPipeline", new BytesArray(innerInnerPipelineJson), XContentType.JSON),
+                "innerPipeline",
+                new PipelineConfiguration("innerPipeline", new BytesArray(innerPipelineJson), XContentType.JSON),
+                "outerPipeline",
+                new PipelineConfiguration("outerPipeline", new BytesArray(outerPipelineJson), XContentType.JSON)
+            )
+        );
+        // The pipeline is not used in any index, expected to return false.
+        var clusterState = clusterStateWithIndex(b -> {}, ingestMetadata);
+        assertFalse(GeoIpDownloaderTaskExecutor.hasAtLeastOneGeoipProcessor(clusterState));
+
+        // The pipeline is set as default pipeline in an index, expected to return true.
+        clusterState = clusterStateWithIndex(b -> b.put(IndexSettings.DEFAULT_PIPELINE.getKey(), "outerPipeline"), ingestMetadata);
+        assertTrue(GeoIpDownloaderTaskExecutor.hasAtLeastOneGeoipProcessor(clusterState));
+
+        // The pipeline is set as final pipeline in an index, expected to return true.
+        clusterState = clusterStateWithIndex(b -> b.put(IndexSettings.FINAL_PIPELINE.getKey(), "outerPipeline"), ingestMetadata);
+        assertTrue(GeoIpDownloaderTaskExecutor.hasAtLeastOneGeoipProcessor(clusterState));
+    }
+
+    public void testHasAtLeastOneGeoipProcessorRecursion() throws IOException {
+        /*
+         * The pipeline in this test is invalid -- it has a cycle from outerPipeline -> innerPipeline -> innerInnerPipeline ->
+         * innerPipeline. Since this method is called at server startup, we want to make sure that we don't get a StackOverFlowError and
+         * that we don't throw any kind of validation exception (since that would be an unexpected change of behavior).
+         */
+        String innerInnerPipelineJson = """
+            {
+              "processors":[""" + getGeoIpProcessor(false) + """
+              , {"pipeline": {"name": "innerPipeline"}}
+              ]
+            }
+            """;
+        String innerPipelineJson = """
+                        {
+              "processors":[{"pipeline": {"name": "innerInnerPipeline"}}
+              ]
+            }
+            """;
+        String outerPipelineJson = """
+                        {
+              "processors":[{"pipeline": {"name": "innerPipeline"}}
+              ]
+            }
+            """;
+        IngestMetadata ingestMetadata = new IngestMetadata(
+            Map.of(
+                "innerInnerPipeline",
+                new PipelineConfiguration("innerInnerPipeline", new BytesArray(innerInnerPipelineJson), XContentType.JSON),
+                "innerPipeline",
+                new PipelineConfiguration("innerPipeline", new BytesArray(innerPipelineJson), XContentType.JSON),
+                "outerPipeline",
+                new PipelineConfiguration("outerPipeline", new BytesArray(outerPipelineJson), XContentType.JSON)
+            )
+        );
+        // The pipeline is not used in any index, expected to return false.
+        var clusterState = clusterStateWithIndex(b -> {}, ingestMetadata);
+        assertFalse(GeoIpDownloaderTaskExecutor.hasAtLeastOneGeoipProcessor(clusterState));
+
+        // The pipeline is set as default pipeline in an index, expected to return true.
+        clusterState = clusterStateWithIndex(b -> b.put(IndexSettings.DEFAULT_PIPELINE.getKey(), "outerPipeline"), ingestMetadata);
+        assertTrue(GeoIpDownloaderTaskExecutor.hasAtLeastOneGeoipProcessor(clusterState));
+
+        // The pipeline is set as final pipeline in an index, expected to return true.
+        clusterState = clusterStateWithIndex(b -> b.put(IndexSettings.FINAL_PIPELINE.getKey(), "outerPipeline"), ingestMetadata);
+        assertTrue(GeoIpDownloaderTaskExecutor.hasAtLeastOneGeoipProcessor(clusterState));
+    }
+
     public void testHasAtLeastOneGeoipProcessor() throws IOException {
         final IngestMetadata[] ingestMetadata = new IngestMetadata[1];
         ClusterState clusterState = mock(ClusterState.class);
@@ -282,4 +380,13 @@ public class GeoIpDownloaderTaskExecutorTests extends ESTestCase {
             return Strings.toString(builder);
         }
     }
+
+    private ClusterState clusterStateWithIndex(Consumer<Settings.Builder> consumer, IngestMetadata ingestMetadata) {
+        var builder = indexSettings(IndexVersion.current(), 1, 1);
+        consumer.accept(builder);
+        var indexMetadata = new IndexMetadata.Builder("index").settings(builder.build()).build();
+        return ClusterState.builder(ClusterName.DEFAULT)
+            .metadata(Metadata.builder().putCustom(IngestMetadata.TYPE, ingestMetadata).put(indexMetadata, false))
+            .build();
+    }
 }