Browse Source

Enable analytics geoip in behavioral analytics. (#96624)

* When using a managed pipeline GeoIpDownloader is triggered only when an index exists for the pipeline.

* When using a managed pipeline GeoIpDownloader is triggered only when an index exists for the pipeline.

* Adding the geoip processor back

* Adding tags to the events mapping.

* Fix a forbidden API call into tests.

* lint

* Adding an integration tests for managed pipelines.

* lint

* Add a geoip_database_lazy_download param to pipelines and use it instead of managed.

* Fix a edge case: pipeline can be set after index is created.

* lint.

* Update docs/changelog/96624.yaml

* Update 96624.yaml

* Uses a processor setting (download_database_on_pipeline_creation) to decide database download strategy.

* Removing debug instruction.

* Improved documentation.

* Improved the way to check for referenced pipelines.

* Fixing an error in test.

* Improved integration tests.

* Lint.

* Fix failing tests.

* Fix failing tests (2).

* Adding javadoc.

* lint javadoc.

* Using a set instead of a list to store checked pipelines.
Aurélien FOUCRET 2 years ago
parent
commit
dd1d157b47

+ 5 - 0
docs/changelog/96624.yaml

@@ -0,0 +1,5 @@
+pr: 96624
+summary: Enable analytics geoip in behavioral analytics
+area: "Application"
+type: feature
+issues: []

+ 9 - 8
docs/reference/ingest/processors/geoip.asciidoc

@@ -14,7 +14,7 @@ CC BY-SA 4.0 license. It automatically downloads these databases if your nodes c
 
 * `ingest.geoip.downloader.eager.download` is set to true
 * your cluster has at least one pipeline with a `geoip` processor
- 
+
 {es} automatically downloads updates for these databases from the Elastic GeoIP endpoint:
 https://geoip.elastic.co/v1/database. To get download statistics for these
 updates, use the <<geoip-stats-api,GeoIP stats API>>.
@@ -33,13 +33,14 @@ field instead.
 .`geoip` options
 [options="header"]
 |======
-| Name                   | Required  | Default                                                                            | Description
-| `field`                | yes       | -                                                                                  | The field to get the ip address from for the geographical lookup.
-| `target_field`         | no        | geoip                                                                              | The field that will hold the geographical information looked up from the MaxMind database.
-| `database_file`        | no        | GeoLite2-City.mmdb                                                                 | The database filename referring to a database the module ships with (GeoLite2-City.mmdb, GeoLite2-Country.mmdb, or GeoLite2-ASN.mmdb) or a custom database in the `ingest-geoip` config directory.
-| `properties`           | no        | [`continent_name`, `country_iso_code`, `country_name`, `region_iso_code`, `region_name`, `city_name`, `location`] *   | Controls what properties are added to the `target_field` based on the geoip lookup.
-| `ignore_missing`       | no        | `false`                                                                            | If `true` and `field` does not exist, the processor quietly exits without modifying the document
-| `first_only`           | no        | `true`                                                                             | If `true` only first found geoip data will be returned, even if `field` contains array
+| Name                                     | Required  | Default                                                                            | Description
+| `field`                                  | yes       | -                                                                                  | The field to get the ip address from for the geographical lookup.
+| `target_field`                           | no        | geoip                                                                              | The field that will hold the geographical information looked up from the MaxMind database.
+| `database_file`                          | no        | GeoLite2-City.mmdb                                                                 | The database filename referring to a database the module ships with (GeoLite2-City.mmdb, GeoLite2-Country.mmdb, or GeoLite2-ASN.mmdb) or a custom database in the `ingest-geoip` config directory.
+| `properties`                             | no        | [`continent_name`, `country_iso_code`, `country_name`, `region_iso_code`, `region_name`, `city_name`, `location`] *   | Controls what properties are added to the `target_field` based on the geoip lookup.
+| `ignore_missing`                         | no        | `false`                                                                            | If `true` and `field` does not exist, the processor quietly exits without modifying the document
+| `first_only`                             | no        | `true`                                                                             | If `true` only first found geoip data will be returned, even if `field` contains array
+| `download_database_on_pipeline_creation` | no        | `true`                                                                             | If `true` (and if `ingest.geoip.downloader.eager.download` is `false`), the missing database is downloaded when the pipeline is created. Else, the download is triggered by when the pipeline is used as the `default_pipeline` or `final_pipeline` in an index.
 |======
 
 *Depends on what is available in `database_file`:

+ 59 - 0
modules/ingest-geoip/src/internalClusterTest/java/org/elasticsearch/ingest/geoip/GeoIpDownloaderIT.java

@@ -18,11 +18,13 @@ import org.elasticsearch.action.search.SearchResponse;
 import org.elasticsearch.client.internal.Client;
 import org.elasticsearch.cluster.node.DiscoveryNode;
 import org.elasticsearch.common.bytes.BytesReference;
+import org.elasticsearch.common.settings.Setting;
 import org.elasticsearch.common.settings.Settings;
 import org.elasticsearch.core.IOUtils;
 import org.elasticsearch.core.SuppressForbidden;
 import org.elasticsearch.core.TimeValue;
 import org.elasticsearch.env.Environment;
+import org.elasticsearch.index.IndexSettings;
 import org.elasticsearch.index.query.BoolQueryBuilder;
 import org.elasticsearch.index.query.MatchQueryBuilder;
 import org.elasticsearch.index.query.RangeQueryBuilder;
@@ -150,6 +152,7 @@ public class GeoIpDownloaderIT extends AbstractGeoIpIT {
     @TestLogging(value = "org.elasticsearch.ingest.geoip:TRACE", reason = "https://github.com/elastic/elasticsearch/issues/75221")
     public void testInvalidTimestamp() throws Exception {
         assumeTrue("only test with fixture to have stable results", getEndpoint() != null);
+        setupDatabasesInConfigDirectory();
         putGeoIpPipeline();
         updateClusterSettings(Settings.builder().put(GeoIpDownloaderTaskExecutor.ENABLED_SETTING.getKey(), true));
         assertBusy(() -> {
@@ -283,6 +286,42 @@ public class GeoIpDownloaderIT extends AbstractGeoIpIT {
         }, 2, TimeUnit.MINUTES);
     }
 
+    public void testDoNotDownloadDatabaseOnPipelineCreation() throws Exception {
+        assumeTrue("only test with fixture to have stable results", getEndpoint() != null);
+        String pipelineId = randomIdentifier();
+
+        // Removing databases from tmp dir. So we can test the downloader.
+        deleteDatabasesInConfigDirectory();
+
+        // Enabling the downloader.
+        putGeoIpPipeline("_id", false);
+        updateClusterSettings(Settings.builder().put(GeoIpDownloaderTaskExecutor.ENABLED_SETTING.getKey(), true));
+        assertBusy(() -> assertNotNull(getTask()));
+
+        // Creating a pipeline containing a geo processor with lazy download enable.
+        // Download should not be triggered and task state should stay null.
+        putGeoIpPipeline(pipelineId, false);
+        assertNull(getTask().getState());
+
+        // Creating an index which does not reference the pipeline should not trigger the database download.
+        String indexIdentifier = randomIdentifier();
+        assertAcked(client().admin().indices().prepareCreate(indexIdentifier).get());
+        assertNull(getTask().getState());
+
+        // Set the pipeline as default_pipeline or final_pipeline for the index.
+        // This should trigger the database download.
+        Setting<String> pipelineSetting = randomFrom(IndexSettings.FINAL_PIPELINE, IndexSettings.DEFAULT_PIPELINE);
+        Settings indexSettings = Settings.builder().put(pipelineSetting.getKey(), pipelineId).build();
+        assertAcked(client().admin().indices().prepareUpdateSettings(indexIdentifier).setSettings(indexSettings).get());
+        assertBusy(() -> {
+            GeoIpTaskState state = getGeoIpTaskState();
+            assertEquals(Set.of("GeoLite2-ASN.mmdb", "GeoLite2-City.mmdb", "GeoLite2-Country.mmdb"), state.getDatabases().keySet());
+        }, 2, TimeUnit.MINUTES);
+
+        // Remove the created index.
+        assertAcked(client().admin().indices().prepareDelete(indexIdentifier).get());
+    }
+
     @TestLogging(value = "org.elasticsearch.ingest.geoip:TRACE", reason = "https://github.com/elastic/elasticsearch/issues/69972")
     public void testUseGeoIpProcessorWithDownloadedDBs() throws Exception {
         assumeTrue("only test with fixture to have stable results", getEndpoint() != null);
@@ -450,6 +489,17 @@ public class GeoIpDownloaderIT extends AbstractGeoIpIT {
      * @throws IOException
      */
     private void putGeoIpPipeline(String pipelineId) throws IOException {
+        putGeoIpPipeline(pipelineId, true);
+    }
+
+    /**
+     * This creates a pipeline named pipelineId with a geoip processor, which ought to cause the geoip downloader to begin (assuming it is
+     * enabled).
+     * @param pipelineId The name of the new pipeline with a geoip processor
+    *  @param downloadDatabaseOnPipelineCreation Indicates whether the pipeline creation should trigger database download or not.
+     * @throws IOException
+     */
+    private void putGeoIpPipeline(String pipelineId, boolean downloadDatabaseOnPipelineCreation) throws IOException {
         BytesReference bytes;
         try (XContentBuilder builder = JsonXContent.contentBuilder()) {
             builder.startObject();
@@ -463,6 +513,9 @@ public class GeoIpDownloaderIT extends AbstractGeoIpIT {
                             builder.field("field", "ip");
                             builder.field("target_field", "ip-city");
                             builder.field("database_file", "GeoLite2-City.mmdb");
+                            if (downloadDatabaseOnPipelineCreation == false || randomBoolean()) {
+                                builder.field("download_database_on_pipeline_creation", downloadDatabaseOnPipelineCreation);
+                            }
                         }
                         builder.endObject();
                     }
@@ -474,6 +527,9 @@ public class GeoIpDownloaderIT extends AbstractGeoIpIT {
                             builder.field("field", "ip");
                             builder.field("target_field", "ip-country");
                             builder.field("database_file", "GeoLite2-Country.mmdb");
+                            if (downloadDatabaseOnPipelineCreation == false || randomBoolean()) {
+                                builder.field("download_database_on_pipeline_creation", downloadDatabaseOnPipelineCreation);
+                            }
                         }
                         builder.endObject();
                     }
@@ -485,6 +541,9 @@ public class GeoIpDownloaderIT extends AbstractGeoIpIT {
                             builder.field("field", "ip");
                             builder.field("target_field", "ip-asn");
                             builder.field("database_file", "GeoLite2-ASN.mmdb");
+                            if (downloadDatabaseOnPipelineCreation == false || randomBoolean()) {
+                                builder.field("download_database_on_pipeline_creation", downloadDatabaseOnPipelineCreation);
+                            }
                         }
                         builder.endObject();
                     }

+ 99 - 17
modules/ingest-geoip/src/main/java/org/elasticsearch/ingest/geoip/GeoIpDownloaderTaskExecutor.java

@@ -27,6 +27,7 @@ import org.elasticsearch.common.settings.Settings;
 import org.elasticsearch.core.TimeValue;
 import org.elasticsearch.gateway.GatewayService;
 import org.elasticsearch.index.Index;
+import org.elasticsearch.index.IndexSettings;
 import org.elasticsearch.ingest.IngestMetadata;
 import org.elasticsearch.ingest.IngestService;
 import org.elasticsearch.ingest.Pipeline;
@@ -43,11 +44,14 @@ import org.elasticsearch.transport.RemoteTransportException;
 import java.util.List;
 import java.util.Map;
 import java.util.Objects;
+import java.util.Set;
 import java.util.concurrent.atomic.AtomicBoolean;
 import java.util.concurrent.atomic.AtomicReference;
+import java.util.stream.Collectors;
 
 import static org.elasticsearch.ingest.geoip.GeoIpDownloader.DATABASES_INDEX;
 import static org.elasticsearch.ingest.geoip.GeoIpDownloader.GEOIP_DOWNLOADER;
+import static org.elasticsearch.ingest.geoip.GeoIpProcessor.Factory.downloadDatabaseOnPipelineCreation;
 
 /**
  * Persistent task executor that is responsible for starting {@link GeoIpDownloader} after task is allocated by master node.
@@ -207,7 +211,14 @@ public final class GeoIpDownloaderTaskExecutor extends PersistentTasksExecutor<G
             }
         }
 
-        if (event.metadataChanged() && event.changedCustomMetadataSet().contains(IngestMetadata.TYPE)) {
+        if (event.metadataChanged() == false) {
+            return;
+        }
+
+        boolean hasIndicesChanges = event.previousState().metadata().indices().equals(event.state().metadata().indices()) == false;
+        boolean hasIngestPipelineChanges = event.changedCustomMetadataSet().contains(IngestMetadata.TYPE);
+
+        if (hasIngestPipelineChanges || hasIndicesChanges) {
             boolean newAtLeastOneGeoipProcessor = hasAtLeastOneGeoipProcessor(event.state());
             if (newAtLeastOneGeoipProcessor && atLeastOneGeoipProcessor == false) {
                 atLeastOneGeoipProcessor = true;
@@ -222,41 +233,112 @@ public final class GeoIpDownloaderTaskExecutor extends PersistentTasksExecutor<G
         }
     }
 
-    @SuppressWarnings("unchecked")
     static boolean hasAtLeastOneGeoipProcessor(ClusterState clusterState) {
-        List<PipelineConfiguration> pipelineDefinitions = IngestService.getPipelines(clusterState);
-        return pipelineDefinitions.stream().anyMatch(pipelineDefinition -> {
-            Map<String, Object> pipelineMap = pipelineDefinition.getConfigAsMap();
-            return hasAtLeastOneGeoipProcessor((List<Map<String, Object>>) pipelineMap.get(Pipeline.PROCESSORS_KEY));
+        if (pipelineConfigurationsWithGeoIpProcessor(clusterState, true).isEmpty() == false) {
+            return true;
+        }
+
+        Set<String> checkReferencedPipelines = pipelineConfigurationsWithGeoIpProcessor(clusterState, false).stream()
+            .map(PipelineConfiguration::getId)
+            .collect(Collectors.toSet());
+
+        if (checkReferencedPipelines.isEmpty()) {
+            return false;
+        }
+
+        return clusterState.getMetadata().indices().values().stream().anyMatch(indexMetadata -> {
+            String defaultPipeline = IndexSettings.DEFAULT_PIPELINE.get(indexMetadata.getSettings());
+            String finalPipeline = IndexSettings.FINAL_PIPELINE.get(indexMetadata.getSettings());
+            return checkReferencedPipelines.contains(defaultPipeline) || checkReferencedPipelines.contains(finalPipeline);
         });
     }
 
-    private static boolean hasAtLeastOneGeoipProcessor(List<Map<String, Object>> processors) {
-        return processors != null && processors.stream().anyMatch(GeoIpDownloaderTaskExecutor::hasAtLeastOneGeoipProcessor);
+    /**
+     * Retrieve list of pipelines that have at least one geoip processor.
+     * @param clusterState Cluster state.
+     * @param downloadDatabaseOnPipelineCreation Filter the list to include only pipeline with the download_database_on_pipeline_creation
+     *                                           matching the param.
+     * @return A list of {@link PipelineConfiguration} matching criteria.
+     */
+    @SuppressWarnings("unchecked")
+    private static List<PipelineConfiguration> pipelineConfigurationsWithGeoIpProcessor(
+        ClusterState clusterState,
+        boolean downloadDatabaseOnPipelineCreation
+    ) {
+        List<PipelineConfiguration> pipelineDefinitions = IngestService.getPipelines(clusterState);
+        return pipelineDefinitions.stream().filter(pipelineConfig -> {
+            List<Map<String, Object>> processors = (List<Map<String, Object>>) pipelineConfig.getConfigAsMap().get(Pipeline.PROCESSORS_KEY);
+            return hasAtLeastOneGeoipProcessor(processors, downloadDatabaseOnPipelineCreation);
+        }).collect(Collectors.toList());
     }
 
-    private static boolean hasAtLeastOneGeoipProcessor(Map<String, Object> processor) {
-        return processor != null
-            && (processor.containsKey(GeoIpProcessor.TYPE)
-                || isProcessorWithOnFailureGeoIpProcessor(processor)
-                || isForeachProcessorWithGeoipProcessor(processor));
+    /**
+     * Check if a list of processor contains at least a geoip processor.
+     * @param processors List of processors.
+     * @param downloadDatabaseOnPipelineCreation Should the download_database_on_pipeline_creation of the geoip processor be true or false.
+     * @return true if a geoip processor is found in the processor list.
+     */
+    private static boolean hasAtLeastOneGeoipProcessor(List<Map<String, Object>> processors, boolean downloadDatabaseOnPipelineCreation) {
+        return processors != null && processors.stream().anyMatch(p -> hasAtLeastOneGeoipProcessor(p, downloadDatabaseOnPipelineCreation));
+    }
+
+    /**
+     * Check if a processor config is a geoip processor or contains at least a geoip processor.
+     * @param processor Processor config.
+     * @param downloadDatabaseOnPipelineCreation Should the download_database_on_pipeline_creation of the geoip processor be true or false.
+     * @return true if a geoip processor is found in the processor list.
+     */
+    @SuppressWarnings("unchecked")
+    private static boolean hasAtLeastOneGeoipProcessor(Map<String, Object> processor, boolean downloadDatabaseOnPipelineCreation) {
+        if (processor == null) {
+            return false;
+        }
+
+        if (processor.containsKey(GeoIpProcessor.TYPE)) {
+            Map<String, Object> processorConfig = (Map<String, Object>) processor.get(GeoIpProcessor.TYPE);
+            return downloadDatabaseOnPipelineCreation(processorConfig) == downloadDatabaseOnPipelineCreation;
+        }
+
+        return isProcessorWithOnFailureGeoIpProcessor(processor, downloadDatabaseOnPipelineCreation)
+            || isForeachProcessorWithGeoipProcessor(processor, downloadDatabaseOnPipelineCreation);
     }
 
+    /**
+     * Check if a processor config is has an on_failure clause containing at least a geoip processor.
+     * @param processor Processor config.
+     * @param downloadDatabaseOnPipelineCreation Should the download_database_on_pipeline_creation of the geoip processor be true or false.
+     * @return true if a geoip processor is found in the processor list.
+     */
     @SuppressWarnings("unchecked")
-    private static boolean isProcessorWithOnFailureGeoIpProcessor(Map<String, Object> processor) {
+    private static boolean isProcessorWithOnFailureGeoIpProcessor(
+        Map<String, Object> processor,
+        boolean downloadDatabaseOnPipelineCreation
+    ) {
         return processor != null
             && processor.values()
                 .stream()
                 .anyMatch(
                     value -> value instanceof Map
-                        && hasAtLeastOneGeoipProcessor(((Map<String, List<Map<String, Object>>>) value).get("on_failure"))
+                        && hasAtLeastOneGeoipProcessor(
+                            ((Map<String, List<Map<String, Object>>>) value).get("on_failure"),
+                            downloadDatabaseOnPipelineCreation
+                        )
                 );
     }
 
+    /**
+     * Check if a processor is a foreach processor containing at least a geoip processor.
+     * @param processor Processor config.
+     * @param downloadDatabaseOnPipelineCreation Should the download_database_on_pipeline_creation of the geoip processor be true or false.
+     * @return true if a geoip processor is found in the processor list.
+     */
     @SuppressWarnings("unchecked")
-    private static boolean isForeachProcessorWithGeoipProcessor(Map<String, Object> processor) {
+    private static boolean isForeachProcessorWithGeoipProcessor(Map<String, Object> processor, boolean downloadDatabaseOnPipelineCreation) {
         return processor.containsKey("foreach")
-            && hasAtLeastOneGeoipProcessor(((Map<String, Map<String, Object>>) processor.get("foreach")).get("processor"));
+            && hasAtLeastOneGeoipProcessor(
+                ((Map<String, Map<String, Object>>) processor.get("foreach")).get("processor"),
+                downloadDatabaseOnPipelineCreation
+            );
     }
 
     private void startTask(Runnable onFailure) {

+ 12 - 0
modules/ingest-geoip/src/main/java/org/elasticsearch/ingest/geoip/GeoIpProcessor.java

@@ -427,6 +427,10 @@ public final class GeoIpProcessor extends AbstractProcessor {
             boolean ignoreMissing = readBooleanProperty(TYPE, processorTag, config, "ignore_missing", false);
             boolean firstOnly = readBooleanProperty(TYPE, processorTag, config, "first_only", true);
 
+            // Validating the download_database_on_pipeline_creation even if the result
+            // is not used directly by the factory.
+            downloadDatabaseOnPipelineCreation(config, processorTag);
+
             // noop, should be removed in 9.0
             Object value = config.remove("fallback_to_default_databases");
             if (value != null) {
@@ -487,6 +491,14 @@ public final class GeoIpProcessor extends AbstractProcessor {
             );
         }
 
+        public static boolean downloadDatabaseOnPipelineCreation(Map<String, Object> config) {
+            return downloadDatabaseOnPipelineCreation(config, null);
+        }
+
+        public static boolean downloadDatabaseOnPipelineCreation(Map<String, Object> config, String processorTag) {
+            return readBooleanProperty(GeoIpProcessor.TYPE, processorTag, config, "download_database_on_pipeline_creation", true);
+        }
+
         private static boolean useDatabaseUnavailableProcessor(GeoIpDatabase database, String databaseName) {
             // If there is no instance for a database we should fail with a config error, but
             // if there is no instance for a builtin database that we manage via GeoipDownloader then don't fail.

+ 68 - 39
modules/ingest-geoip/src/test/java/org/elasticsearch/ingest/geoip/GeoIpDownloaderTaskExecutorTests.java

@@ -9,13 +9,20 @@
 package org.elasticsearch.ingest.geoip;
 
 import org.elasticsearch.cluster.ClusterState;
+import org.elasticsearch.cluster.metadata.IndexMetadata;
 import org.elasticsearch.cluster.metadata.Metadata;
+import org.elasticsearch.common.Strings;
 import org.elasticsearch.common.bytes.BytesArray;
+import org.elasticsearch.common.settings.Settings;
+import org.elasticsearch.index.IndexSettings;
 import org.elasticsearch.ingest.IngestMetadata;
 import org.elasticsearch.ingest.PipelineConfiguration;
 import org.elasticsearch.test.ESTestCase;
+import org.elasticsearch.xcontent.XContentBuilder;
 import org.elasticsearch.xcontent.XContentType;
+import org.elasticsearch.xcontent.json.JsonXContent;
 
+import java.io.IOException;
 import java.util.HashMap;
 import java.util.List;
 import java.util.Map;
@@ -24,13 +31,46 @@ import static org.mockito.Mockito.mock;
 import static org.mockito.Mockito.when;
 
 public class GeoIpDownloaderTaskExecutorTests extends ESTestCase {
-    public void testHasAtLeastOneGeoipProcessor() {
+
+    public void testHasAtLeastOneGeoipProcessorWhenDownloadDatabaseOnPipelineCreationIsFalse() throws IOException {
+        ClusterState clusterState = mock(ClusterState.class);
+        Metadata metadata = mock(Metadata.class);
+        when(clusterState.getMetadata()).thenReturn(metadata);
+
+        final IngestMetadata[] ingestMetadata = new IngestMetadata[1];
+        when(metadata.custom(IngestMetadata.TYPE)).thenAnswer(invocationOnmock -> ingestMetadata[0]);
+
+        final Settings[] indexSettings = new Settings[1];
+        IndexMetadata indexMetadata = mock(IndexMetadata.class);
+        when(indexMetadata.getSettings()).thenAnswer(invocationMock -> indexSettings[0]);
+        when(metadata.indices()).thenReturn(Map.of("index", indexMetadata));
+
+        for (String pipelineConfigJson : getPipelinesWithGeoIpProcessors(false)) {
+            ingestMetadata[0] = new IngestMetadata(
+                Map.of("_id1", new PipelineConfiguration("_id1", new BytesArray(pipelineConfigJson), XContentType.JSON))
+            );
+            // The pipeline is not used in any index, expected to return false.
+            indexSettings[0] = Settings.EMPTY;
+            assertFalse(GeoIpDownloaderTaskExecutor.hasAtLeastOneGeoipProcessor(clusterState));
+
+            // The pipeline is set as default pipeline in an index, expected to return true.
+            indexSettings[0] = Settings.builder().put(IndexSettings.DEFAULT_PIPELINE.getKey(), "_id1").build();
+            assertTrue(GeoIpDownloaderTaskExecutor.hasAtLeastOneGeoipProcessor(clusterState));
+
+            // The pipeline is set as final pipeline in an index, expected to return true.
+            indexSettings[0] = Settings.builder().put(IndexSettings.FINAL_PIPELINE.getKey(), "_id1").build();
+            assertTrue(GeoIpDownloaderTaskExecutor.hasAtLeastOneGeoipProcessor(clusterState));
+        }
+
+    }
+
+    public void testHasAtLeastOneGeoipProcessor() throws IOException {
         final IngestMetadata[] ingestMetadata = new IngestMetadata[1];
         ClusterState clusterState = mock(ClusterState.class);
         Metadata metadata = mock(Metadata.class);
         when(metadata.custom(IngestMetadata.TYPE)).thenAnswer(invocationOnmock -> ingestMetadata[0]);
         when(clusterState.getMetadata()).thenReturn(metadata);
-        List<String> expectHitsInputs = getPipelinesWithGeoIpProcessors();
+        List<String> expectHitsInputs = getPipelinesWithGeoIpProcessors(true);
         List<String> expectMissesInputs = getPipelinesWithoutGeoIpProcessors();
         {
             // Test that hasAtLeastOneGeoipProcessor returns true for any pipeline with a geoip processor:
@@ -73,15 +113,10 @@ public class GeoIpDownloaderTaskExecutorTests extends ESTestCase {
      * This method returns an assorted list of pipelines that have geoip processors -- ones that ought to cause hasAtLeastOneGeoipProcessor
      *  to return true.
      */
-    private List<String> getPipelinesWithGeoIpProcessors() {
+    private List<String> getPipelinesWithGeoIpProcessors(boolean downloadDatabaseOnPipelineCreation) throws IOException {
         String simpleGeoIpProcessor = """
             {
-              "processors":[
-                {
-                  "geoip":{
-                    "field":"provider"
-                  }
-                }
+              "processors":[""" + getGeoIpProcessor(downloadDatabaseOnPipelineCreation) + """
               ]
             }
             """;
@@ -92,12 +127,7 @@ public class GeoIpDownloaderTaskExecutorTests extends ESTestCase {
                   "rename":{
                     "field":"provider",
                     "target_field":"cloud.provider",
-                    "on_failure":[
-                      {
-                        "geoip":{
-                          "field":"error.message"
-                        }
-                      }
+                    "on_failure":[""" + getGeoIpProcessor(downloadDatabaseOnPipelineCreation) + """
                     ]
                   }
                 }
@@ -110,12 +140,7 @@ public class GeoIpDownloaderTaskExecutorTests extends ESTestCase {
                 {
                   "foreach":{
                     "field":"values",
-                    "processor":
-                      {
-                        "geoip":{
-                          "field":"someField"
-                        }
-                      }
+                    "processor":""" + getGeoIpProcessor(downloadDatabaseOnPipelineCreation) + """
                   }
                 }
               ]
@@ -131,12 +156,7 @@ public class GeoIpDownloaderTaskExecutorTests extends ESTestCase {
                       {
                         "foreach":{
                           "field":"someField",
-                          "processor":
-                            {
-                              "geoip":{
-                                "field":"someField"
-                              }
-                            }
+                          "processor":""" + getGeoIpProcessor(downloadDatabaseOnPipelineCreation) + """
                         }
                       }
                   }
@@ -159,12 +179,7 @@ public class GeoIpDownloaderTaskExecutorTests extends ESTestCase {
                               "rename":{
                                 "field":"provider",
                                 "target_field":"cloud.provider",
-                                "on_failure":[
-                                  {
-                                    "geoip":{
-                                      "field":"error.message"
-                                    }
-                                  }
+                                "on_failure":[""" + getGeoIpProcessor(downloadDatabaseOnPipelineCreation) + """
                                 ]
                               }
                             }
@@ -186,12 +201,7 @@ public class GeoIpDownloaderTaskExecutorTests extends ESTestCase {
                       {
                         "foreach":{
                           "field":"values",
-                          "processor":
-                            {
-                              "geoip":{
-                                "field":"someField"
-                              }
-                            }
+                          "processor":""" + getGeoIpProcessor(downloadDatabaseOnPipelineCreation) + """
                         }
                       }
                     ]
@@ -252,4 +262,23 @@ public class GeoIpDownloaderTaskExecutorTests extends ESTestCase {
             """;
         return List.of(empty, noProcessors, onFailureWithForeachWithSet);
     }
+
+    private String getGeoIpProcessor(boolean downloadDatabaseOnPipelineCreation) throws IOException {
+        try (XContentBuilder builder = JsonXContent.contentBuilder()) {
+            builder.startObject();
+            {
+                builder.startObject("geoip");
+                {
+                    builder.field("field", randomIdentifier());
+                    if (downloadDatabaseOnPipelineCreation == false || randomBoolean()) {
+                        builder.field("download_database_on_pipeline_creation", downloadDatabaseOnPipelineCreation);
+                    }
+                }
+                builder.endObject();
+            }
+            builder.endObject();
+
+            return Strings.toString(builder);
+        }
+    }
 }

+ 11 - 0
modules/ingest-geoip/src/test/java/org/elasticsearch/ingest/geoip/GeoIpProcessorFactoryTests.java

@@ -45,6 +45,7 @@ import java.util.Locale;
 import java.util.Map;
 import java.util.Set;
 
+import static org.hamcrest.Matchers.anEmptyMap;
 import static org.hamcrest.Matchers.equalTo;
 import static org.hamcrest.Matchers.hasEntry;
 import static org.hamcrest.Matchers.instanceOf;
@@ -407,6 +408,16 @@ public class GeoIpProcessorFactoryTests extends ESTestCase {
         assertWarnings(GeoIpProcessor.DEFAULT_DATABASES_DEPRECATION_MESSAGE);
     }
 
+    public void testDownloadDatabaseOnPipelineCreation() throws IOException {
+        GeoIpProcessor.Factory factory = new GeoIpProcessor.Factory(databaseNodeService);
+        Map<String, Object> config = new HashMap<>();
+        config.put("field", randomIdentifier());
+        config.put("download_database_on_pipeline_creation", randomBoolean());
+        factory.create(null, null, null, config);
+        // Check all the config params were consumed.
+        assertThat(config, anEmptyMap());
+    }
+
     public void testDefaultDatabaseWithTaskPresent() throws Exception {
         PersistentTasksCustomMetadata tasks = PersistentTasksCustomMetadata.builder()
             .addTask(GeoIpDownloader.GEOIP_DOWNLOADER, GeoIpDownloader.GEOIP_DOWNLOADER, null, null)

+ 8 - 0
x-pack/plugin/core/src/main/resources/org/elasticsearch/xpack/entsearch/analytics/behavioral_analytics-events-final_pipeline.json

@@ -57,6 +57,14 @@
         "ignore_missing": true
       }
     },
+    {
+      "geoip": {
+        "field": "session.ip",
+        "target_field": "session.location",
+        "ignore_missing": true,
+        "download_database_on_pipeline_creation": false
+      }
+    },
     {
       "remove": {
         "field": "session.ip",

+ 4 - 0
x-pack/plugin/core/src/main/resources/org/elasticsearch/xpack/entsearch/analytics/behavioral_analytics-events-mappings.json

@@ -259,6 +259,10 @@
               }
             }
           }
+        },
+        "tags": {
+          "ignore_above": 1024,
+          "type": "keyword"
         }
       }
     }

+ 0 - 4
x-pack/qa/rolling-upgrade/src/test/java/org/elasticsearch/upgrades/GeoIpUpgradeIT.java

@@ -8,7 +8,6 @@
 package org.elasticsearch.upgrades;
 
 import org.apache.http.util.EntityUtils;
-import org.elasticsearch.Version;
 import org.elasticsearch.client.Request;
 import org.elasticsearch.client.Response;
 import org.hamcrest.Matchers;
@@ -17,10 +16,7 @@ import java.nio.charset.StandardCharsets;
 
 public class GeoIpUpgradeIT extends AbstractUpgradeTestCase {
 
-    @AwaitsFix(bugUrl = "https://github.com/elastic/elasticsearch/issues/96129")
     public void testGeoIpDownloader() throws Exception {
-        assumeTrue("Disabled until PR #95621 is backported to branch " + Version.V_8_8_0, UPGRADE_FROM_VERSION.onOrBefore(Version.V_8_7_0));
-
         if (CLUSTER_TYPE == ClusterType.UPGRADED) {
             assertBusy(() -> {
                 Response response = client().performRequest(new Request("GET", "_cat/tasks"));