Explorar o código

Remove default maxmind geoip databases from distribution (#78362)

* Adjusted integration tests to use geoip test fixture or to use test databases provided via config dirs (for qa module / docs).
* Kept the geolite2-databases dependency for most of the unit tests only.
* Made fallback_to_default_databases parameter on geoip processor a noop and emit deprecation warning upon using it.
* If no geoip databases are available yet to a node then the geoip processor factory returns a processor implementation that flags documents that databases are unavailable. This allows these documents to be reindex later with a pipeline. These documents will have a tag string array field, which contains a string _geoip_database_unavailable_{database_name} for each missing database in a pipeline.
* Added reload pipeline capabilities is IngestService, so that when databases are available again on a node then pipelines with geoip processor definition can be reloaded.

Relates to #68920
Martijn van Groningen %!s(int64=4) %!d(string=hai) anos
pai
achega
04e5823a69
Modificáronse 26 ficheiros con 715 adicións e 339 borrados
  1. 7 2
      docs/build.gradle
  2. 12 12
      docs/reference/ingest/common-log-format-example.asciidoc
  3. 26 20
      docs/reference/ingest/processors/geoip.asciidoc
  4. 28 0
      docs/reference/migration/migrate_8_0/ingest.asciidoc
  5. BIN=BIN
      docs/src/test/resources/GeoLite2-City.mmdb
  6. BIN=BIN
      docs/src/test/resources/GeoLite2-Country.mmdb
  7. 16 8
      modules/ingest-geoip/build.gradle
  8. 30 13
      modules/ingest-geoip/qa/file-based-update/src/test/java/org/elasticsearch/ingest/geoip/UpdateDatabasesIT.java
  9. 73 0
      modules/ingest-geoip/src/internalClusterTest/java/org/elasticsearch/ingest/geoip/GeoIpDownloaderIT.java
  10. 7 8
      modules/ingest-geoip/src/internalClusterTest/java/org/elasticsearch/ingest/geoip/ReloadingDatabasesWhilePerformingGeoLookupsIT.java
  11. 26 3
      modules/ingest-geoip/src/main/java/org/elasticsearch/ingest/geoip/DatabaseRegistry.java
  12. 43 5
      modules/ingest-geoip/src/main/java/org/elasticsearch/ingest/geoip/GeoIpProcessor.java
  13. 7 60
      modules/ingest-geoip/src/main/java/org/elasticsearch/ingest/geoip/LocalDatabases.java
  14. 39 6
      modules/ingest-geoip/src/main/java/org/elasticsearch/ingest/geoip/stats/GeoIpDownloaderStatsAction.java
  15. 2 1
      modules/ingest-geoip/src/main/java/org/elasticsearch/ingest/geoip/stats/GeoIpDownloaderStatsTransportAction.java
  16. 49 11
      modules/ingest-geoip/src/test/java/org/elasticsearch/ingest/geoip/DatabaseRegistryTests.java
  17. 107 72
      modules/ingest-geoip/src/test/java/org/elasticsearch/ingest/geoip/GeoIpProcessorFactoryTests.java
  18. 28 41
      modules/ingest-geoip/src/test/java/org/elasticsearch/ingest/geoip/LocalDatabasesTests.java
  19. 2 1
      modules/ingest-geoip/src/test/java/org/elasticsearch/ingest/geoip/stats/GeoIpDownloaderStatsActionNodeResponseSerializingTests.java
  20. 29 0
      modules/ingest-geoip/src/yamlRestTest/java/org/elasticsearch/ingest/geoip/IngestGeoIpClientYamlTestSuiteIT.java
  21. 63 63
      modules/ingest-geoip/src/yamlRestTest/resources/rest-api-spec/test/ingest_geoip/20_geoip_processor.yml
  22. 5 0
      qa/smoke-test-ingest-with-all-dependencies/build.gradle
  23. BIN=BIN
      qa/smoke-test-ingest-with-all-dependencies/src/test/resources/GeoLite2-City.mmdb
  24. 8 8
      qa/smoke-test-ingest-with-all-dependencies/src/test/resources/rest-api-spec/test/ingest/20_combine_processors.yml
  25. 26 2
      server/src/main/java/org/elasticsearch/ingest/IngestService.java
  26. 82 3
      server/src/test/java/org/elasticsearch/ingest/IngestServiceTests.java

+ 7 - 2
docs/build.gradle

@@ -63,8 +63,6 @@ testClusters.matching { it.name == "integTest"}.configureEach {
   if (singleNode().testDistribution == DEFAULT) {
     setting 'xpack.license.self_generated.type', 'trial'
     setting 'indices.lifecycle.history_index_enabled', 'false'
-    setting 'ingest.geoip.downloader.enabled', 'false'
-    systemProperty 'es.geoip_v2_feature_flag_enabled', 'true'
     keystorePassword 'keystore-password'
   }
 
@@ -89,6 +87,9 @@ testClusters.matching { it.name == "integTest"}.configureEach {
   // Whitelist reindexing from the local node so we can test it.
   setting 'reindex.remote.whitelist', '127.0.0.1:*'
 
+  extraConfigFile 'ingest-geoip/GeoLite2-City.mmdb', file("${project.projectDir}/src/test/resources/GeoLite2-City.mmdb")
+  extraConfigFile 'ingest-geoip/GeoLite2-Country.mmdb', file("${project.projectDir}/src/test/resources/GeoLite2-Country.mmdb")
+
   // TODO: remove this once cname is prepended to transport.publish_address by default in 8.0
   systemProperty 'es.transport.cname_in_publish_address', 'true'
 
@@ -114,6 +115,10 @@ tasks.named("integTest").configure {
   }
 }
 
+tasks.named("forbiddenPatterns").configure {
+  exclude '**/*.mmdb'
+}
+
 tasks.named("buildRestTests").configure {
   docs = docsFileTree
 }

+ 12 - 12
docs/reference/ingest/common-log-format-example.asciidoc

@@ -175,7 +175,7 @@ PUT _index_template/my-data-stream-template
 ----
 POST my-data-stream/_doc?pipeline=my-pipeline
 {
-  "message": "212.87.37.154 - - [05/May/2099:16:21:15 +0000] \"GET /favicon.ico HTTP/1.1\" 200 3638 \"-\" \"Mozilla/5.0 (Macintosh; Intel Mac OS X 10_11_6) AppleWebKit/537.36 (KHTML, like Gecko) Chrome/52.0.2743.116 Safari/537.36\""
+  "message": "89.160.20.128 - - [05/May/2099:16:21:15 +0000] \"GET /favicon.ico HTTP/1.1\" 200 3638 \"-\" \"Mozilla/5.0 (Macintosh; Intel Mac OS X 10_11_6) AppleWebKit/537.36 (KHTML, like Gecko) Chrome/52.0.2743.116 Safari/537.36\""
 }
 ----
 // TEST[s/my-pipeline/my-pipeline&refresh=wait_for/]
@@ -216,21 +216,21 @@ The API returns:
             "version": "1.1"
           },
           "source": {
-            "ip": "212.87.37.154",
+            "ip": "89.160.20.128",
             "geo": {
-              "continent_name": "Europe",
-              "region_iso_code": "DE-BE",
-              "city_name": "Berlin",
-              "country_iso_code": "DE",
-              "country_name": "Germany",
-              "region_name": "Land Berlin",
-              "location": {
-                "lon": 13.4978,
-                "lat": 52.411
+              "continent_name" : "Europe",
+              "country_name" : "Sweden",
+              "country_iso_code" : "SE",
+              "city_name" : "Linköping",
+              "region_iso_code" : "SE-E",
+              "region_name" : "Östergötland County",
+              "location" : {
+                "lon" : 15.6167,
+                "lat" : 58.4167
               }
             }
           },
-          "message": "212.87.37.154 - - [05/May/2099:16:21:15 +0000] \"GET /favicon.ico HTTP/1.1\" 200 3638 \"-\" \"Mozilla/5.0 (Macintosh; Intel Mac OS X 10_11_6) AppleWebKit/537.36 (KHTML, like Gecko) Chrome/52.0.2743.116 Safari/537.36\"",
+          "message": "89.160.20.128 - - [05/May/2099:16:21:15 +0000] \"GET /favicon.ico HTTP/1.1\" 200 3638 \"-\" \"Mozilla/5.0 (Macintosh; Intel Mac OS X 10_11_6) AppleWebKit/537.36 (KHTML, like Gecko) Chrome/52.0.2743.116 Safari/537.36\"",
           "url": {
             "original": "/favicon.ico"
           },

+ 26 - 20
docs/reference/ingest/processors/geoip.asciidoc

@@ -69,7 +69,7 @@ PUT _ingest/pipeline/geoip
 }
 PUT my-index-000001/_doc/my_id?pipeline=geoip
 {
-  "ip": "8.8.8.8"
+  "ip": "89.160.20.128"
 }
 GET my-index-000001/_doc/my_id
 --------------------------------------------------
@@ -86,12 +86,15 @@ Which returns:
   "_seq_no": 55,
   "_primary_term": 1,
   "_source": {
-    "ip": "8.8.8.8",
+    "ip": "89.160.20.128",
     "geoip": {
-      "continent_name": "North America",
-      "country_name": "United States",
-      "country_iso_code": "US",
-      "location": { "lat": 37.751, "lon": -97.822 }
+      "continent_name": "Europe",
+      "country_name": "Sweden",
+      "country_iso_code": "SE",
+      "city_name" : "Linköping",
+      "region_iso_code" : "SE-E",
+      "region_name" : "Östergötland County",
+      "location": { "lat": 58.4167, "lon": 15.6167 }
     }
   }
 }
@@ -119,7 +122,7 @@ PUT _ingest/pipeline/geoip
 }
 PUT my-index-000001/_doc/my_id?pipeline=geoip
 {
-  "ip": "8.8.8.8"
+  "ip": "89.160.20.128"
 }
 GET my-index-000001/_doc/my_id
 --------------------------------------------------
@@ -136,11 +139,11 @@ returns this:
   "_seq_no": 65,
   "_primary_term": 1,
   "_source": {
-    "ip": "8.8.8.8",
+    "ip": "89.160.20.128",
     "geo": {
-      "continent_name": "North America",
-      "country_name": "United States",
-      "country_iso_code": "US"
+      "continent_name": "Europe",
+      "country_name": "Sweden",
+      "country_iso_code": "SE"
     }
   }
 }
@@ -236,7 +239,7 @@ PUT _ingest/pipeline/geoip
 
 PUT my_ip_locations/_doc/1?refresh=true&pipeline=geoip
 {
-  "ip": "8.8.8.8"
+  "ip": "89.160.20.128"
 }
 
 GET /my_ip_locations/_search
@@ -250,8 +253,8 @@ GET /my_ip_locations/_search
         "geo_distance": {
           "distance": "1m",
           "geoip.location": {
-            "lon": -97.822,
-            "lat": 37.751
+            "lon": 15.6167,
+            "lat": 58.4167
           }
         }
       }
@@ -285,15 +288,18 @@ GET /my_ip_locations/_search
         "_score" : 1.0,
         "_source" : {
           "geoip" : {
-            "continent_name" : "North America",
-            "country_name" : "United States",
-            "country_iso_code" : "US",
+            "continent_name" : "Europe",
+            "country_name" : "Sweden",
+            "country_iso_code" : "SE",
+            "city_name" : "Linköping",
+            "region_iso_code" : "SE-E",
+            "region_name" : "Östergötland County",
             "location" : {
-              "lon" : -97.822,
-              "lat" : 37.751
+              "lon" : 15.6167,
+              "lat" : 58.4167
             }
           },
-          "ip" : "8.8.8.8"
+          "ip" : "89.160.20.128"
         }
       }
     ]

+ 28 - 0
docs/reference/migration/migrate_8_0/ingest.asciidoc

@@ -18,4 +18,32 @@ Common Schema (ECS)] fields, regardless of the `ecs` value.
 To avoid deprecation warnings, remove the parameter from your ingest pipelines.
 If a pipeline specifies an `ecs` value, the value is ignored.
 ====
+
+.The default Maxmind geoip databases have been removed.
+[%collapsible]
+====
+*Details* +
+The default Maxmind geoip databases that shipped by default with Elasticsearch
+have been removed. These databases are out dated and stale and using these
+databases will likely result in incorrect geoip lookups.
+
+By default since 7.13, these pre-packaged geoip databases
+were used in case no database were specified in the config directory or before
+the geoip downloader downloaded the geoip databases. When the geoip database
+downloader completed downloading the new databases then these pre-packaged
+databases were no longer used.
+
+*Impact* +
+If the geoip downloader is disabled and no geoip databases are provided
+in the config directory of each ingest node then the geoip processor will
+no longer perform geoip lookups and tag these documents with the fact that
+the requested database is no longer available.
+
+After a cluster has been started and before the geoip downloader has completed
+downloading the most up to data databases, the geoip processor will not perform
+any geoip lookups and tag documents that the requested database is not available.
+After the geoip downloader has completed downloading the most up to data databases
+then the geoip processor will function as normal. The window of time that the
+geoip processor can't do geoip lookups after cluster startup should be very small.
+====
 //end::notable-breaking-changes[]

BIN=BIN
docs/src/test/resources/GeoLite2-City.mmdb


BIN=BIN
docs/src/test/resources/GeoLite2-Country.mmdb


+ 16 - 8
modules/ingest-geoip/build.gradle

@@ -13,7 +13,7 @@ apply plugin: 'elasticsearch.yaml-rest-compat-test'
 apply plugin: 'elasticsearch.internal-cluster-test'
 
 esplugin {
-  description 'Ingest processor that uses lookup geo data based on IP adresses using the MaxMind geo database'
+  description 'Ingest processor that uses lookup geo data based on IP addresses using the MaxMind geo database'
   classname 'org.elasticsearch.ingest.geoip.IngestGeoIpPlugin'
 }
 
@@ -57,14 +57,7 @@ tasks.named("internalClusterTest").configure {
   }
 }
 
-tasks.register("copyDefaultGeoIp2DatabaseFiles", Copy) {
-  from { zipTree(configurations.testCompileClasspath.files.find { it.name.contains('geolite2-databases') }) }
-  into "${project.buildDir}/ingest-geoip"
-  include "*.mmdb"
-}
-
 tasks.named("bundlePlugin").configure {
-  dependsOn("copyDefaultGeoIp2DatabaseFiles")
   from("${project.buildDir}/ingest-geoip") {
     into '/'
   }
@@ -107,3 +100,18 @@ tasks.named("dependencyLicenses").configure {
   mapping from: /maxmind-db.*/, to: 'maxmind-db-reader'
   ignoreFile 'elastic-geoip-database-service-agreement-LICENSE.txt'
 }
+
+testClusters.configureEach {
+  // Needed for database downloader, uses delete-by-query to cleanup old databases from org.elasticsearch.ingest.geoip database system index
+  module ':modules:reindex'
+  // Downloader is enabled by default, but in test clusters in build disabled by default,
+  // but in this module, the downloader should be enabled by default
+  systemProperty 'ingest.geoip.downloader.enabled.default', 'true'
+  if (useFixture) {
+    setting 'ingest.geoip.downloader.endpoint', { "${-> fixtureAddress()}" }
+  }
+}
+
+tasks.named("yamlRestTestV7CompatTransform").configure { task ->
+  task.skipTestsByFilePattern("**/ingest_geoip/20_geoip_processor.yml", "from 8.0 yaml rest tests use geoip test fixture and default geoip are no longer packaged. In 7.x yaml tests used default databases which makes tests results very different, so skipping these tests")
+}

+ 30 - 13
modules/ingest-geoip/qa/file-based-update/src/test/java/org/elasticsearch/ingest/geoip/UpdateDatabasesIT.java

@@ -7,25 +7,25 @@
  */
 package org.elasticsearch.ingest.geoip;
 
-import org.apache.http.util.EntityUtils;
 import org.elasticsearch.client.Request;
-import org.elasticsearch.client.Response;
-import org.elasticsearch.core.PathUtils;
 import org.elasticsearch.common.settings.SecureString;
 import org.elasticsearch.common.settings.Settings;
 import org.elasticsearch.common.util.concurrent.ThreadContext;
+import org.elasticsearch.core.PathUtils;
 import org.elasticsearch.xcontent.ObjectPath;
-import org.elasticsearch.common.xcontent.XContentHelper;
-import org.elasticsearch.xcontent.json.JsonXContent;
 import org.elasticsearch.test.rest.ESRestTestCase;
 
 import java.io.IOException;
 import java.nio.file.Files;
 import java.nio.file.Path;
+import java.util.List;
 import java.util.Map;
 
+import static org.hamcrest.Matchers.either;
 import static org.hamcrest.Matchers.equalTo;
 import static org.hamcrest.Matchers.is;
+import static org.hamcrest.Matchers.notNullValue;
+import static org.hamcrest.Matchers.nullValue;
 
 public class UpdateDatabasesIT extends ESRestTestCase {
 
@@ -35,8 +35,14 @@ public class UpdateDatabasesIT extends ESRestTestCase {
         Request simulatePipelineRequest = new Request("POST", "/_ingest/pipeline/_simulate");
         simulatePipelineRequest.setJsonEntity(body);
         {
-            Map<String, Object> response = toMap(client().performRequest(simulatePipelineRequest));
-            assertThat(ObjectPath.eval("docs.0.doc._source.geoip.city_name", response), equalTo("Tumba"));
+            Map<String, Object> response = entityAsMap(client().performRequest(simulatePipelineRequest));
+            assertThat(ObjectPath.eval("docs.0.doc._source.tags.0", response), equalTo("_geoip_database_unavailable_GeoLite2-City.mmdb"));
+        }
+
+        // Ensure no config databases have been setup:
+        {
+            Map<?, ?> stats = getGeoIpStatsForSingleNode();
+            assertThat(stats, nullValue());
         }
 
         Path configPath = PathUtils.get(System.getProperty("tests.config.dir"));
@@ -46,14 +52,25 @@ public class UpdateDatabasesIT extends ESRestTestCase {
         Files.copy(UpdateDatabasesIT.class.getResourceAsStream("/GeoLite2-City-Test.mmdb"),
             ingestGeoipDatabaseDir.resolve("GeoLite2-City.mmdb"));
 
-        assertBusy(() -> {
-            Map<String, Object> response = toMap(client().performRequest(simulatePipelineRequest));
-            assertThat(ObjectPath.eval("docs.0.doc._source.geoip.city_name", response), equalTo("Linköping"));
-        });
+        // Ensure that a config database has been setup:
+        {
+            assertBusy(() -> {
+                Map<?, ?> stats = getGeoIpStatsForSingleNode();
+                assertThat(stats, notNullValue());
+                assertThat(stats.get("config_databases"), equalTo(List.of("GeoLite2-City.mmdb")));
+            });
+        }
+
+        Map<String, Object> response = entityAsMap(client().performRequest(simulatePipelineRequest));
+        assertThat(ObjectPath.eval("docs.0.doc._source.geoip.city_name", response), equalTo("Linköping"));
     }
 
-    private static Map<String, Object> toMap(Response response) throws IOException {
-        return XContentHelper.convertToMap(JsonXContent.jsonXContent, EntityUtils.toString(response.getEntity()), false);
+    private static Map<?, ?> getGeoIpStatsForSingleNode() throws IOException {
+        Request request = new Request("GET", "/_ingest/geoip/stats");
+        Map<String, Object> response = entityAsMap(client().performRequest(request));
+        Map<?, ?> nodes = (Map<?, ?>) response.get("nodes");
+        assertThat(nodes.size(), either(equalTo(0)).or(equalTo(1)));
+        return nodes.isEmpty() ? null : (Map<?, ?>) nodes.values().iterator().next();
     }
 
     @Override

+ 73 - 0
modules/ingest-geoip/src/internalClusterTest/java/org/elasticsearch/ingest/geoip/GeoIpDownloaderIT.java

@@ -28,6 +28,7 @@ import org.elasticsearch.env.Environment;
 import org.elasticsearch.index.query.BoolQueryBuilder;
 import org.elasticsearch.index.query.MatchQueryBuilder;
 import org.elasticsearch.index.query.RangeQueryBuilder;
+import org.elasticsearch.ingest.geoip.stats.GeoIpDownloaderStatsAction;
 import org.elasticsearch.reindex.ReindexPlugin;
 import org.elasticsearch.persistent.PersistentTaskParams;
 import org.elasticsearch.persistent.PersistentTasksCustomMetadata;
@@ -40,6 +41,7 @@ import org.junit.After;
 import java.io.ByteArrayInputStream;
 import java.io.IOException;
 import java.io.InputStream;
+import java.io.UncheckedIOException;
 import java.nio.file.Files;
 import java.nio.file.Path;
 import java.nio.file.StandardCopyOption;
@@ -48,10 +50,12 @@ import java.util.Arrays;
 import java.util.Collection;
 import java.util.Iterator;
 import java.util.List;
+import java.util.Map;
 import java.util.Set;
 import java.util.concurrent.TimeUnit;
 import java.util.stream.Collectors;
 import java.util.stream.Stream;
+import java.util.stream.StreamSupport;
 import java.util.zip.GZIPInputStream;
 
 import static org.elasticsearch.test.hamcrest.ElasticsearchAssertions.assertAcked;
@@ -60,10 +64,13 @@ import static org.hamcrest.Matchers.contains;
 import static org.hamcrest.Matchers.containsInAnyOrder;
 import static org.hamcrest.Matchers.empty;
 import static org.hamcrest.Matchers.equalTo;
+import static org.hamcrest.Matchers.hasEntry;
 import static org.hamcrest.Matchers.hasItem;
 import static org.hamcrest.Matchers.hasItems;
+import static org.hamcrest.Matchers.hasKey;
 import static org.hamcrest.Matchers.is;
 import static org.hamcrest.Matchers.not;
+import static org.hamcrest.Matchers.notNullValue;
 import static org.hamcrest.Matchers.nullValue;
 
 public class GeoIpDownloaderIT extends AbstractGeoIpIT {
@@ -259,6 +266,7 @@ public class GeoIpDownloaderIT extends AbstractGeoIpIT {
     @TestLogging(value = "org.elasticsearch.ingest.geoip:TRACE", reason = "https://github.com/elastic/elasticsearch/issues/69972")
     public void testUseGeoIpProcessorWithDownloadedDBs() throws Exception {
         assumeTrue("only test with fixture to have stable results", ENDPOINT != null);
+        setupDatabasesInConfigDirectory();
         // setup:
         putPipeline();
 
@@ -303,6 +311,41 @@ public class GeoIpDownloaderIT extends AbstractGeoIpIT {
         });
     }
 
+    public void testStartWithNoDatabases() throws Exception {
+        assumeTrue("only test with fixture to have stable results", ENDPOINT != null);
+        putPipeline();
+
+        // Behaviour without any databases loaded:
+        {
+            SimulateDocumentBaseResult result = simulatePipeline();
+            assertThat(result.getFailure(), nullValue());
+            assertThat(result.getIngestDocument(), notNullValue());
+            Map<String, Object> source = result.getIngestDocument().getSourceAndMetadata();
+            assertThat(source, hasEntry("tags", List.of("_geoip_database_unavailable_GeoLite2-City.mmdb",
+                "_geoip_database_unavailable_GeoLite2-Country.mmdb", "_geoip_database_unavailable_GeoLite2-ASN.mmdb")));
+        }
+
+        // Enable downloader:
+        Settings.Builder settings = Settings.builder().put(GeoIpDownloaderTaskExecutor.ENABLED_SETTING.getKey(), true);
+        assertAcked(client().admin().cluster().prepareUpdateSettings().setPersistentSettings(settings));
+        {
+            assertBusy(() -> {
+                SimulateDocumentBaseResult result = simulatePipeline();
+                assertThat(result.getFailure(), nullValue());
+                assertThat(result.getIngestDocument(), notNullValue());
+                Map<?, ?> source = result.getIngestDocument().getSourceAndMetadata();
+                assertThat(source, not(hasKey("tags")));
+                assertThat(source, hasKey("ip-city"));
+                assertThat(source, hasKey("ip-asn"));
+                assertThat(source, hasKey("ip-country"));
+
+                assertThat(((Map<?, ?>) source.get("ip-city")).get("city_name"), equalTo("Linköping"));
+                assertThat(((Map<?, ?>) source.get("ip-asn")).get("organization_name"), equalTo("Bredband2 AB"));
+                assertThat(((Map<?, ?>) source.get("ip-country")).get("country_name"), equalTo("Sweden"));
+            });
+        }
+    }
+
     private void verifyUpdatedDatabase() throws Exception {
         assertBusy(() -> {
             SimulateDocumentBaseResult result = simulatePipeline();
@@ -412,6 +455,36 @@ public class GeoIpDownloaderIT extends AbstractGeoIpIT {
         return geoipTmpDirs;
     }
 
+    private void setupDatabasesInConfigDirectory() throws Exception {
+        StreamSupport.stream(internalCluster().getInstances(Environment.class).spliterator(), false)
+            .map(Environment::configFile)
+            .map(path -> path.resolve("ingest-geoip"))
+            .distinct()
+            .forEach(path -> {
+                try {
+                    Files.createDirectories(path);
+                    Files.copy(GeoIpDownloaderIT.class.getResourceAsStream("/GeoLite2-City.mmdb"),
+                        path.resolve("GeoLite2-City.mmdb"));
+                    Files.copy(GeoIpDownloaderIT.class.getResourceAsStream("/GeoLite2-ASN.mmdb"),
+                        path.resolve("GeoLite2-ASN.mmdb"));
+                    Files.copy(GeoIpDownloaderIT.class.getResourceAsStream("/GeoLite2-Country.mmdb"),
+                        path.resolve("GeoLite2-Country.mmdb"));
+                } catch (IOException e) {
+                    throw new UncheckedIOException(e);
+                }
+            });
+
+        assertBusy(() -> {
+            GeoIpDownloaderStatsAction.Response  response =
+                client().execute(GeoIpDownloaderStatsAction.INSTANCE, new GeoIpDownloaderStatsAction.Request()).actionGet();
+            assertThat(response.getNodes(), not(empty()));
+            for (GeoIpDownloaderStatsAction.NodeResponse nodeResponse : response.getNodes()) {
+                assertThat(nodeResponse.getConfigDatabases(),
+                    containsInAnyOrder("GeoLite2-Country.mmdb", "GeoLite2-City.mmdb", "GeoLite2-ASN.mmdb"));
+            }
+        });
+    }
+
     @SuppressForbidden(reason = "Maxmind API requires java.io.File")
     private void parseDatabase(Path tempFile) throws IOException {
         try (DatabaseReader databaseReader = new DatabaseReader.Builder(tempFile.toFile()).build()) {

+ 7 - 8
modules/ingest-geoip/src/internalClusterTest/java/org/elasticsearch/ingest/geoip/ReloadingDatabasesWhilePerformingGeoLookupsIT.java

@@ -56,10 +56,9 @@ public class ReloadingDatabasesWhilePerformingGeoLookupsIT extends ESTestCase {
      * geoip processor instance is using the related {@link DatabaseReaderLazyLoader} instance
      */
     public void test() throws Exception {
-        Path geoIpModulesDir = createTempDir();
         Path geoIpConfigDir = createTempDir();
         Path geoIpTmpDir = createTempDir();
-        DatabaseRegistry databaseRegistry = createRegistry(geoIpModulesDir, geoIpConfigDir, geoIpTmpDir);
+        DatabaseRegistry databaseRegistry = createRegistry(geoIpConfigDir, geoIpTmpDir);
         ClusterService clusterService = mock(ClusterService.class);
         when(clusterService.state()).thenReturn(ClusterState.EMPTY_STATE);
         GeoIpProcessor.Factory factory = new GeoIpProcessor.Factory(databaseRegistry, clusterService);
@@ -71,8 +70,8 @@ public class ReloadingDatabasesWhilePerformingGeoLookupsIT extends ESTestCase {
         databaseRegistry.updateDatabase("GeoLite2-City-Test.mmdb", "md5", geoIpTmpDir.resolve("GeoLite2-City-Test.mmdb"));
         lazyLoadReaders(databaseRegistry);
 
-        final GeoIpProcessor processor1 = factory.create(null, "_tag", null, new HashMap<>(Map.of("field", "_field")));
-        final GeoIpProcessor processor2 = factory.create(null, "_tag", null,
+        final GeoIpProcessor processor1 = (GeoIpProcessor) factory.create(null, "_tag", null, new HashMap<>(Map.of("field", "_field")));
+        final GeoIpProcessor processor2 = (GeoIpProcessor) factory.create(null, "_tag", null,
             new HashMap<>(Map.of("field", "_field", "database_file", "GeoLite2-City-Test.mmdb")));
 
         final AtomicBoolean completed = new AtomicBoolean(false);
@@ -164,13 +163,13 @@ public class ReloadingDatabasesWhilePerformingGeoLookupsIT extends ESTestCase {
             assertThat(lazyLoader.current(), equalTo(0));
         }
         // Avoid accumulating many temp dirs while running with -Dtests.iters=X
-        IOUtils.rm(geoIpModulesDir, geoIpConfigDir, geoIpTmpDir);
+        IOUtils.rm(geoIpConfigDir, geoIpTmpDir);
     }
 
-    private static DatabaseRegistry createRegistry(Path geoIpModulesDir, Path geoIpConfigDir, Path geoIpTmpDir) throws IOException {
-        copyDatabaseFiles(geoIpModulesDir);
+    private static DatabaseRegistry createRegistry(Path geoIpConfigDir, Path geoIpTmpDir) throws IOException {
         GeoIpCache cache = new GeoIpCache(0);
-        LocalDatabases localDatabases = new LocalDatabases(geoIpModulesDir, geoIpConfigDir, cache);
+        LocalDatabases localDatabases = new LocalDatabases(geoIpConfigDir, cache);
+        copyDatabaseFiles(geoIpConfigDir, localDatabases);
         DatabaseRegistry databaseRegistry =
             new DatabaseRegistry(geoIpTmpDir, mock(Client.class), cache, localDatabases, Runnable::run);
         databaseRegistry.initialize("nodeId", mock(ResourceWatcherService.class), mock(IngestService.class));

+ 26 - 3
modules/ingest-geoip/src/main/java/org/elasticsearch/ingest/geoip/DatabaseRegistry.java

@@ -54,6 +54,7 @@ import java.util.Set;
 import java.util.concurrent.ConcurrentHashMap;
 import java.util.concurrent.ConcurrentMap;
 import java.util.function.Consumer;
+import java.util.function.Predicate;
 import java.util.stream.Collectors;
 import java.util.stream.Stream;
 import java.util.zip.GZIPInputStream;
@@ -87,6 +88,7 @@ public final class DatabaseRegistry implements Closeable {
     private Path geoipTmpDirectory;
     private final LocalDatabases localDatabases;
     private final Consumer<Runnable> genericExecutor;
+    private IngestService ingestService;
 
     private final ConcurrentMap<String, DatabaseReaderLazyLoader> databases = new ConcurrentHashMap<>();
 
@@ -150,14 +152,15 @@ public final class DatabaseRegistry implements Closeable {
         }
         LOGGER.info("initialized database registry, using geoip-databases directory [{}]", geoipTmpDirectory);
         ingestService.addIngestClusterStateListener(this::checkDatabases);
+        this.ingestService = ingestService;
     }
 
-    public DatabaseReaderLazyLoader getDatabase(String name, boolean fallbackUsingDefaultDatabases) {
+    public DatabaseReaderLazyLoader getDatabase(String name) {
         // There is a need for reference counting in order to avoid using an instance
         // that gets closed while using it. (this can happen during a database update)
         while (true) {
             DatabaseReaderLazyLoader instance =
-                databases.getOrDefault(name, localDatabases.getDatabase(name, fallbackUsingDefaultDatabases));
+                databases.getOrDefault(name, localDatabases.getDatabase(name));
             if (instance == null || instance.preLookup()) {
                 return instance;
             }
@@ -167,7 +170,7 @@ public final class DatabaseRegistry implements Closeable {
     }
 
     List<DatabaseReaderLazyLoader> getAllDatabases() {
-        List<DatabaseReaderLazyLoader> all = new ArrayList<>(localDatabases.getAllDatabases());
+        List<DatabaseReaderLazyLoader> all = new ArrayList<>(localDatabases.getConfigDatabases().values());
         this.databases.forEach((key, value) -> all.add(value));
         return all;
     }
@@ -313,6 +316,22 @@ public final class DatabaseRegistry implements Closeable {
             DatabaseReaderLazyLoader existing = databases.put(databaseFileName, loader);
             if (existing != null) {
                 existing.close();
+            } else {
+                // Loaded a database for the first time, so reload pipelines for which a database was not available:
+                Predicate<GeoIpProcessor.DatabaseUnavailableProcessor> predicate = p -> databaseFileName.equals(p.getDatabaseName());
+                var ids = ingestService.getPipelineWithProcessorType(GeoIpProcessor.DatabaseUnavailableProcessor.class, predicate);
+                if (ids.isEmpty() == false) {
+                    for (var id : ids) {
+                        try {
+                            ingestService.reloadPipeline(id);
+                            LOGGER.debug("successfully reloaded pipeline [{}] after downloading of database [{}] for the first time",
+                                id, databaseFileName);
+                        } catch (Exception e) {
+                            LOGGER.debug((Supplier<?>) () -> new ParameterizedMessage(
+                                "failed to reload pipeline [{}] after downloading of database [{}]", id, databaseFileName), e);
+                        }
+                    }
+                }
             }
             LOGGER.info("successfully reloaded changed geoip database file [{}]", file);
         } catch (Exception e) {
@@ -384,6 +403,10 @@ public final class DatabaseRegistry implements Closeable {
         return Set.copyOf(databases.keySet());
     }
 
+    public Set<String> getConfigDatabases() {
+        return localDatabases.getConfigDatabases().keySet();
+    }
+
     public Set<String> getFilesInTemp() {
         try (Stream<Path> files = Files.list(geoipTmpDirectory)) {
             return files.map(Path::getFileName).map(Path::toString).collect(Collectors.toSet());

+ 43 - 5
modules/ingest-geoip/src/main/java/org/elasticsearch/ingest/geoip/GeoIpProcessor.java

@@ -23,6 +23,8 @@ import org.elasticsearch.ResourceNotFoundException;
 import org.elasticsearch.cluster.ClusterState;
 import org.elasticsearch.cluster.service.ClusterService;
 import org.elasticsearch.common.CheckedSupplier;
+import org.elasticsearch.common.logging.DeprecationCategory;
+import org.elasticsearch.common.logging.DeprecationLogger;
 import org.elasticsearch.common.logging.HeaderWarning;
 import org.elasticsearch.common.network.InetAddresses;
 import org.elasticsearch.common.network.NetworkAddress;
@@ -52,6 +54,10 @@ import static org.elasticsearch.persistent.PersistentTasksCustomMetadata.getTask
 
 public final class GeoIpProcessor extends AbstractProcessor {
 
+    private static final DeprecationLogger DEPRECATION_LOGGER = DeprecationLogger.getLogger(GeoIpProcessor.class);
+    static final String DEFAULT_DATABASES_DEPRECATION_MESSAGE = "the [fallback_to_default_databases] has been deprecated," +
+        " because Elasticsearch no longer includes the default Maxmind geoip databases. This setting will be removed in Elasticsearch 9.0";
+
     public static final String TYPE = "geoip";
     private static final String CITY_DB_SUFFIX = "-City";
     private static final String COUNTRY_DB_SUFFIX = "-Country";
@@ -369,7 +375,7 @@ public final class GeoIpProcessor extends AbstractProcessor {
         }
 
         @Override
-        public GeoIpProcessor create(
+        public Processor create(
             final Map<String, Processor.Factory> registry,
             final String processorTag,
             final String description, final Map<String, Object> config) throws IOException {
@@ -379,12 +385,19 @@ public final class GeoIpProcessor extends AbstractProcessor {
             List<String> propertyNames = readOptionalList(TYPE, processorTag, config, "properties");
             boolean ignoreMissing = readBooleanProperty(TYPE, processorTag, config, "ignore_missing", false);
             boolean firstOnly = readBooleanProperty(TYPE, processorTag, config, "first_only", true);
-            boolean fallbackUsingDefaultDatabases = readBooleanProperty(TYPE, processorTag, config, "fallback_to_default_databases", true);
 
-            DatabaseReaderLazyLoader lazyLoader = databaseRegistry.getDatabase(databaseFile, fallbackUsingDefaultDatabases);
-            if (lazyLoader == null) {
+            // noop, should be removed in 9.0
+            Object value = config.remove("fallback_to_default_databases");
+            if (value != null) {
+                DEPRECATION_LOGGER.critical(DeprecationCategory.OTHER, "default_databases_message", DEFAULT_DATABASES_DEPRECATION_MESSAGE);
+            }
+
+            DatabaseReaderLazyLoader lazyLoader = databaseRegistry.getDatabase(databaseFile);
+            if (lazyLoader == null && databaseRegistry.getAvailableDatabases().isEmpty() == false) {
                 throw newConfigurationException(TYPE, processorTag,
                     "database_file", "database file [" + databaseFile + "] doesn't exist");
+            } else if (lazyLoader == null && databaseRegistry.getAvailableDatabases().isEmpty()) {
+                return new DatabaseUnavailableProcessor(processorTag, description, databaseFile);
             }
             final String databaseType;
             try {
@@ -417,7 +430,7 @@ public final class GeoIpProcessor extends AbstractProcessor {
                 }
             }
             CheckedSupplier<DatabaseReaderLazyLoader, IOException> supplier = () -> {
-                DatabaseReaderLazyLoader loader = databaseRegistry.getDatabase(databaseFile, fallbackUsingDefaultDatabases);
+                DatabaseReaderLazyLoader loader = databaseRegistry.getDatabase(databaseFile);
                 if (loader == null) {
                     throw new ResourceNotFoundException("database file [" + databaseFile + "] doesn't exist");
                 }
@@ -518,4 +531,29 @@ public final class GeoIpProcessor extends AbstractProcessor {
             }
         }
     }
+
+    static class DatabaseUnavailableProcessor extends AbstractProcessor {
+
+        private final String databaseName;
+
+        DatabaseUnavailableProcessor(String tag, String description, String databaseName) {
+            super(tag, description);
+            this.databaseName = databaseName;
+        }
+
+        @Override
+        public IngestDocument execute(IngestDocument ingestDocument) throws Exception {
+            ingestDocument.appendFieldValue("tags", "_geoip_database_unavailable_" + databaseName, true);
+            return ingestDocument;
+        }
+
+        @Override
+        public String getType() {
+            return TYPE;
+        }
+
+        public String getDatabaseName() {
+            return databaseName;
+        }
+    }
 }

+ 7 - 60
modules/ingest-geoip/src/main/java/org/elasticsearch/ingest/geoip/LocalDatabases.java

@@ -11,8 +11,6 @@ import org.apache.logging.log4j.LogManager;
 import org.apache.logging.log4j.Logger;
 import org.apache.logging.log4j.message.ParameterizedMessage;
 import org.apache.logging.log4j.util.Supplier;
-import org.elasticsearch.core.SuppressForbidden;
-import org.elasticsearch.core.PathUtils;
 import org.elasticsearch.env.Environment;
 import org.elasticsearch.watcher.FileChangesListener;
 import org.elasticsearch.watcher.FileWatcher;
@@ -23,23 +21,17 @@ import java.io.IOException;
 import java.nio.file.Files;
 import java.nio.file.Path;
 import java.nio.file.PathMatcher;
-import java.util.ArrayList;
 import java.util.Collections;
 import java.util.HashMap;
 import java.util.Iterator;
-import java.util.List;
 import java.util.Map;
 import java.util.concurrent.ConcurrentHashMap;
 import java.util.concurrent.ConcurrentMap;
 import java.util.stream.Stream;
 
-import static org.elasticsearch.ingest.geoip.IngestGeoIpPlugin.DEFAULT_DATABASE_FILENAMES;
-
 /**
- * Keeps track of the databases locally available to a node:
- * 1) Default databases shipped with the default distribution via ingest-geoip module
- * 2) User provided databases from the ES_HOME/config/ingest-geoip directory. This directory is monitored
- *    and files updates are picked up and may cause databases being loaded or removed at runtime.
+ * Keeps track of user provided databases in the ES_HOME/config/ingest-geoip directory.
+ * This directory is monitored and files updates are picked up and may cause databases being loaded or removed at runtime.
  */
 final class LocalDatabases implements Closeable {
 
@@ -48,28 +40,16 @@ final class LocalDatabases implements Closeable {
     private final GeoIpCache cache;
     private final Path geoipConfigDir;
 
-    private final Map<String, DatabaseReaderLazyLoader> defaultDatabases;
     private final ConcurrentMap<String, DatabaseReaderLazyLoader> configDatabases;
 
     LocalDatabases(Environment environment, GeoIpCache cache) {
-        this(
-            // In GeoIpProcessorNonIngestNodeTests, ingest-geoip is loaded on the classpath.
-            // This means that the plugin is never unbundled into a directory where the database files would live.
-            // Therefore, we have to copy these database files ourselves. To do this, we need the ability to specify where
-            // those database files would go. We do this by adding a plugin that registers ingest.geoip.database_path as an
-            // actual setting. Otherwise, in production code, this setting is not registered and the database path is not configurable.
-            environment.settings().get("ingest.geoip.database_path") != null ?
-                getGeoipConfigDirectory(environment) :
-                environment.modulesFile().resolve("ingest-geoip"),
-            environment.configFile().resolve("ingest-geoip"),
-            cache);
+        this(environment.configFile().resolve("ingest-geoip"), cache);
     }
 
-    LocalDatabases(Path geoipModuleDir, Path geoipConfigDir, GeoIpCache cache) {
+    LocalDatabases(Path geoipConfigDir, GeoIpCache cache) {
         this.cache = cache;
         this.geoipConfigDir = geoipConfigDir;
         this.configDatabases = new ConcurrentHashMap<>();
-        this.defaultDatabases = initDefaultDatabases(geoipModuleDir);
     }
 
     void initialize(ResourceWatcherService resourceWatcher) throws IOException {
@@ -79,22 +59,11 @@ final class LocalDatabases implements Closeable {
         watcher.addListener(new GeoipDirectoryListener());
         resourceWatcher.add(watcher, ResourceWatcherService.Frequency.HIGH);
 
-        LOGGER.info("initialized default databases [{}], config databases [{}] and watching [{}] for changes",
-            defaultDatabases.keySet(), configDatabases.keySet(), geoipConfigDir);
-    }
-
-    DatabaseReaderLazyLoader getDatabase(String name, boolean fallbackUsingDefaultDatabases) {
-        return configDatabases.getOrDefault(name, fallbackUsingDefaultDatabases ? defaultDatabases.get(name) : null);
-    }
-
-    List<DatabaseReaderLazyLoader> getAllDatabases() {
-        List<DatabaseReaderLazyLoader> all = new ArrayList<>(defaultDatabases.values());
-        all.addAll(configDatabases.values());
-        return all;
+        LOGGER.info("initialized config databases [{}] and watching [{}] for changes", configDatabases.keySet(), geoipConfigDir);
     }
 
-    Map<String, DatabaseReaderLazyLoader> getDefaultDatabases() {
-        return defaultDatabases;
+    DatabaseReaderLazyLoader getDatabase(String name) {
+        return configDatabases.get(name);
     }
 
     Map<String, DatabaseReaderLazyLoader> getConfigDatabases() {
@@ -122,20 +91,6 @@ final class LocalDatabases implements Closeable {
         }
     }
 
-    Map<String, DatabaseReaderLazyLoader> initDefaultDatabases(Path geoipModuleDir) {
-        Map<String, DatabaseReaderLazyLoader> databases = new HashMap<>(DEFAULT_DATABASE_FILENAMES.length);
-
-        for (String filename : DEFAULT_DATABASE_FILENAMES) {
-            Path source = geoipModuleDir.resolve(filename);
-            assert Files.exists(source);
-            String databaseFileName = source.getFileName().toString();
-            DatabaseReaderLazyLoader loader = new DatabaseReaderLazyLoader(cache, source, null);
-            databases.put(databaseFileName, loader);
-        }
-
-        return Collections.unmodifiableMap(databases);
-    }
-
     Map<String, DatabaseReaderLazyLoader> initConfigDatabases(Path geoipConfigDir) throws IOException {
         Map<String, DatabaseReaderLazyLoader> databases = new HashMap<>();
 
@@ -161,19 +116,11 @@ final class LocalDatabases implements Closeable {
 
     @Override
     public void close() throws IOException {
-        for (DatabaseReaderLazyLoader lazyLoader : defaultDatabases.values()) {
-            lazyLoader.close();
-        }
         for (DatabaseReaderLazyLoader lazyLoader : configDatabases.values()) {
             lazyLoader.close();
         }
     }
 
-    @SuppressForbidden(reason = "PathUtils#get")
-    private static Path getGeoipConfigDirectory(Environment environment) {
-        return PathUtils.get(environment.settings().get("ingest.geoip.database_path"));
-    }
-
     private class GeoipDirectoryListener implements FileChangesListener {
 
         @Override

+ 39 - 6
modules/ingest-geoip/src/main/java/org/elasticsearch/ingest/geoip/stats/GeoIpDownloaderStatsAction.java

@@ -8,6 +8,7 @@
 
 package org.elasticsearch.ingest.geoip.stats;
 
+import org.elasticsearch.Version;
 import org.elasticsearch.action.ActionType;
 import org.elasticsearch.action.FailedNodeException;
 import org.elasticsearch.action.support.nodes.BaseNodeResponse;
@@ -91,6 +92,10 @@ public class GeoIpDownloaderStatsAction extends ActionType<GeoIpDownloaderStatsA
             super(clusterName, nodes, failures);
         }
 
+        public GeoIpDownloaderStats getStats() {
+            return getNodes().stream().map(n -> n.stats).filter(Objects::nonNull).findFirst().orElse(GeoIpDownloaderStats.EMPTY);
+        }
+
         @Override
         protected List<NodeResponse> readNodesFrom(StreamInput in) throws IOException {
             return in.readList(NodeResponse::new);
@@ -103,14 +108,13 @@ public class GeoIpDownloaderStatsAction extends ActionType<GeoIpDownloaderStatsA
 
         @Override
         public XContentBuilder toXContent(XContentBuilder builder, Params params) throws IOException {
-            GeoIpDownloaderStats stats =
-                getNodes().stream().map(n -> n.stats).filter(Objects::nonNull).findFirst().orElse(GeoIpDownloaderStats.EMPTY);
+            GeoIpDownloaderStats stats = getStats();
             builder.startObject();
             builder.field("stats", stats);
             builder.startObject("nodes");
             for (Map.Entry<String, NodeResponse> e : getNodesMap().entrySet()) {
                 NodeResponse response = e.getValue();
-                if (response.filesInTemp.isEmpty() && response.databases.isEmpty()) {
+                if (response.filesInTemp.isEmpty() && response.databases.isEmpty() && response.configDatabases.isEmpty()) {
                     continue;
                 }
                 builder.startObject(e.getKey());
@@ -126,6 +130,9 @@ public class GeoIpDownloaderStatsAction extends ActionType<GeoIpDownloaderStatsA
                 if (response.filesInTemp.isEmpty() == false) {
                     builder.array("files_in_temp", response.filesInTemp.toArray(String[]::new));
                 }
+                if (response.configDatabases.isEmpty() == false) {
+                    builder.array("config_databases", response.configDatabases.toArray(String[]::new));
+                }
                 builder.endObject();
             }
             builder.endObject();
@@ -152,19 +159,39 @@ public class GeoIpDownloaderStatsAction extends ActionType<GeoIpDownloaderStatsA
         private final GeoIpDownloaderStats stats;
         private final Set<String> databases;
         private final Set<String> filesInTemp;
+        private final Set<String> configDatabases;
 
         protected NodeResponse(StreamInput in) throws IOException {
             super(in);
             stats = in.readBoolean() ? new GeoIpDownloaderStats(in) : null;
             databases = in.readSet(StreamInput::readString);
             filesInTemp = in.readSet(StreamInput::readString);
+            configDatabases = in.getVersion().onOrAfter(Version.V_8_0_0) ? in.readSet(StreamInput::readString) : null;
         }
 
-        protected NodeResponse(DiscoveryNode node, GeoIpDownloaderStats stats, Set<String> databases, Set<String> filesInTemp) {
+        protected NodeResponse(DiscoveryNode node, GeoIpDownloaderStats stats, Set<String> databases, Set<String> filesInTemp,
+                               Set<String> configDatabases) {
             super(node);
             this.stats = stats;
             this.databases = databases;
             this.filesInTemp = filesInTemp;
+            this.configDatabases = configDatabases;
+        }
+
+        public GeoIpDownloaderStats getStats() {
+            return stats;
+        }
+
+        public Set<String> getDatabases() {
+            return databases;
+        }
+
+        public Set<String> getFilesInTemp() {
+            return filesInTemp;
+        }
+
+        public Set<String> getConfigDatabases() {
+            return configDatabases;
         }
 
         @Override
@@ -176,6 +203,9 @@ public class GeoIpDownloaderStatsAction extends ActionType<GeoIpDownloaderStatsA
             }
             out.writeCollection(databases, StreamOutput::writeString);
             out.writeCollection(filesInTemp, StreamOutput::writeString);
+            if (out.getVersion().onOrAfter(Version.V_8_0_0)) {
+                out.writeCollection(configDatabases, StreamOutput::writeString);
+            }
         }
 
         @Override
@@ -183,12 +213,15 @@ public class GeoIpDownloaderStatsAction extends ActionType<GeoIpDownloaderStatsA
             if (this == o) return true;
             if (o == null || getClass() != o.getClass()) return false;
             NodeResponse that = (NodeResponse) o;
-            return stats.equals(that.stats) && databases.equals(that.databases) && filesInTemp.equals(that.filesInTemp);
+            return stats.equals(that.stats) &&
+                databases.equals(that.databases) &&
+                filesInTemp.equals(that.filesInTemp) &&
+                Objects.equals(configDatabases, that.configDatabases);
         }
 
         @Override
         public int hashCode() {
-            return Objects.hash(stats, databases, filesInTemp);
+            return Objects.hash(stats, databases, filesInTemp, configDatabases);
         }
     }
 }

+ 2 - 1
modules/ingest-geoip/src/main/java/org/elasticsearch/ingest/geoip/stats/GeoIpDownloaderStatsTransportAction.java

@@ -66,6 +66,7 @@ public class GeoIpDownloaderStatsTransportAction extends TransportNodesAction<Re
     protected NodeResponse nodeOperation(NodeRequest request, Task task) {
         GeoIpDownloader geoIpTask = geoIpDownloaderTaskExecutor.getCurrentTask();
         GeoIpDownloaderStats stats = geoIpTask == null || geoIpTask.getStatus() == null ? null : geoIpTask.getStatus();
-        return new NodeResponse(transportService.getLocalNode(), stats, registry.getAvailableDatabases(), registry.getFilesInTemp());
+        return new NodeResponse(transportService.getLocalNode(), stats, registry.getAvailableDatabases(), registry.getFilesInTemp(),
+            registry.getConfigDatabases());
     }
 }

+ 49 - 11
modules/ingest-geoip/src/test/java/org/elasticsearch/ingest/geoip/DatabaseRegistryTests.java

@@ -71,6 +71,7 @@ import java.util.Set;
 import java.util.UUID;
 import java.util.function.Consumer;
 import java.util.stream.Collectors;
+import java.util.stream.IntStream;
 import java.util.stream.Stream;
 import java.util.zip.GZIPInputStream;
 import java.util.zip.GZIPOutputStream;
@@ -80,13 +81,18 @@ import static org.elasticsearch.persistent.PersistentTasksCustomMetadata.Persist
 import static org.elasticsearch.persistent.PersistentTasksCustomMetadata.TYPE;
 import static org.hamcrest.Matchers.empty;
 import static org.hamcrest.Matchers.equalTo;
+import static org.hamcrest.Matchers.greaterThanOrEqualTo;
 import static org.hamcrest.Matchers.notNullValue;
 import static org.hamcrest.Matchers.nullValue;
 import static org.mockito.Matchers.any;
+import static org.mockito.Matchers.anyString;
 import static org.mockito.Mockito.mock;
 import static org.mockito.Mockito.never;
+import static org.mockito.Mockito.reset;
 import static org.mockito.Mockito.times;
 import static org.mockito.Mockito.verify;
+import static org.mockito.Mockito.verifyNoMoreInteractions;
+import static org.mockito.Mockito.verifyZeroInteractions;
 import static org.mockito.Mockito.when;
 
 @LuceneTestCase.SuppressFileSystems(value = "ExtrasFS") // Don't randomly add 'extra' files to directory.
@@ -97,24 +103,25 @@ public class DatabaseRegistryTests extends ESTestCase {
     private ThreadPool threadPool;
     private DatabaseRegistry databaseRegistry;
     private ResourceWatcherService resourceWatcherService;
+    private IngestService ingestService;
 
     @Before
     public void setup() throws IOException {
-        final Path geoIpDir = createTempDir();
         final Path geoIpConfigDir = createTempDir();
         Files.createDirectories(geoIpConfigDir);
-        copyDatabaseFiles(geoIpDir);
+        GeoIpCache cache = new GeoIpCache(1000);
+        LocalDatabases localDatabases = new LocalDatabases(geoIpConfigDir, cache);
+        copyDatabaseFiles(geoIpConfigDir, localDatabases);
 
         threadPool = new TestThreadPool(LocalDatabases.class.getSimpleName());
         Settings settings = Settings.builder().put("resource.reload.interval.high", TimeValue.timeValueMillis(100)).build();
         resourceWatcherService = new ResourceWatcherService(settings, threadPool);
 
         client = mock(Client.class);
-        GeoIpCache cache = new GeoIpCache(1000);
-        LocalDatabases localDatabases = new LocalDatabases(geoIpDir, geoIpConfigDir, cache);
+        ingestService = mock(IngestService.class);
         geoIpTmpDir = createTempDir();
         databaseRegistry = new DatabaseRegistry(geoIpTmpDir, client, cache, localDatabases, Runnable::run);
-        databaseRegistry.initialize("nodeId", resourceWatcherService, mock(IngestService.class));
+        databaseRegistry.initialize("nodeId", resourceWatcherService, ingestService);
     }
 
     @After
@@ -139,11 +146,17 @@ public class DatabaseRegistryTests extends ESTestCase {
             .routingTable(createIndexRoutingTable())
             .build();
 
-        assertThat(databaseRegistry.getDatabase("GeoIP2-City.mmdb", false), nullValue());
+        int numPipelinesToBeReloaded = randomInt(4);
+        List<String> pipelineIds = IntStream.range(0, numPipelinesToBeReloaded).mapToObj(String::valueOf).collect(Collectors.toList());
+        when(ingestService.getPipelineWithProcessorType(any(), any())).thenReturn(pipelineIds);
+
+        assertThat(databaseRegistry.getDatabase("GeoIP2-City.mmdb"), nullValue());
+        // Nothing should be downloaded, since the database is no longer valid (older than 30 days)
         databaseRegistry.checkDatabases(state);
-        DatabaseReaderLazyLoader database = databaseRegistry.getDatabase("GeoIP2-City.mmdb", false);
+        DatabaseReaderLazyLoader database = databaseRegistry.getDatabase("GeoIP2-City.mmdb");
         assertThat(database, nullValue());
         verify(client, times(0)).search(any());
+        verify(ingestService, times(0)).reloadPipeline(anyString());
         try (Stream<Path> files = Files.list(geoIpTmpDir.resolve("geoip-databases").resolve("nodeId"))) {
             assertEquals(0, files.count());
         }
@@ -159,10 +172,16 @@ public class DatabaseRegistryTests extends ESTestCase {
                 .localNodeId("_id1"))
             .routingTable(createIndexRoutingTable())
             .build();
+        // Database should be downloaded
         databaseRegistry.checkDatabases(state);
-        database = databaseRegistry.getDatabase("GeoIP2-City.mmdb", false);
+        database = databaseRegistry.getDatabase("GeoIP2-City.mmdb");
         assertThat(database, notNullValue());
         verify(client, times(10)).search(any());
+        try (Stream<Path> files = Files.list(geoIpTmpDir.resolve("geoip-databases").resolve("nodeId"))) {
+            assertThat(files.count(), greaterThanOrEqualTo(1L));
+        }
+        // First time GeoIP2-City.mmdb is downloaded, so a pipeline reload can happen:
+        verify(ingestService, times(numPipelinesToBeReloaded)).reloadPipeline(anyString());
         //30 days check passed but we mocked mmdb data so parsing will fail
         expectThrows(InvalidDatabaseException.class, database::get);
     }
@@ -184,7 +203,7 @@ public class DatabaseRegistryTests extends ESTestCase {
             .build();
 
         databaseRegistry.checkDatabases(state);
-        assertThat(databaseRegistry.getDatabase("GeoIP2-City.mmdb", false), nullValue());
+        assertThat(databaseRegistry.getDatabase("GeoIP2-City.mmdb"), nullValue());
         verify(client, never()).search(any());
         try (Stream<Path> files = Files.list(geoIpTmpDir.resolve("geoip-databases").resolve("nodeId"))) {
             assertThat(files.collect(Collectors.toList()), empty());
@@ -206,7 +225,7 @@ public class DatabaseRegistryTests extends ESTestCase {
             .build();
 
         databaseRegistry.checkDatabases(state);
-        assertThat(databaseRegistry.getDatabase("GeoIP2-City.mmdb", false), nullValue());
+        assertThat(databaseRegistry.getDatabase("GeoIP2-City.mmdb"), nullValue());
         verify(client, never()).search(any());
         try (Stream<Path> files = Files.list(geoIpTmpDir.resolve("geoip-databases").resolve("nodeId"))) {
             assertThat(files.collect(Collectors.toList()), empty());
@@ -227,7 +246,7 @@ public class DatabaseRegistryTests extends ESTestCase {
         mockSearches("GeoIP2-City.mmdb", 0, 9);
 
         databaseRegistry.checkDatabases(state);
-        assertThat(databaseRegistry.getDatabase("GeoIP2-City.mmdb", false), nullValue());
+        assertThat(databaseRegistry.getDatabase("GeoIP2-City.mmdb"), nullValue());
         verify(client, never()).search(any());
         try (Stream<Path> files = Files.list(geoIpTmpDir.resolve("geoip-databases").resolve("nodeId"))) {
             assertThat(files.collect(Collectors.toList()), empty());
@@ -273,6 +292,25 @@ public class DatabaseRegistryTests extends ESTestCase {
         verify(client, times(10)).search(any());
     }
 
+    public void testUpdateDatabase() throws Exception {
+        int numPipelinesToBeReloaded = randomInt(4);
+        List<String> pipelineIds = IntStream.range(0, numPipelinesToBeReloaded).mapToObj(String::valueOf).collect(Collectors.toList());
+        when(ingestService.getPipelineWithProcessorType(any(), any())).thenReturn(pipelineIds);
+
+        databaseRegistry.updateDatabase("_name", "_md5", geoIpTmpDir.resolve("some-file"));
+
+        // Updating the first time may trigger a reload.
+        verify(ingestService, times(1)).addIngestClusterStateListener(any());
+        verify(ingestService, times(1)).getPipelineWithProcessorType(any(), any());
+        verify(ingestService, times(numPipelinesToBeReloaded)).reloadPipeline(anyString());
+        verifyNoMoreInteractions(ingestService);
+        reset(ingestService);
+
+        // Subsequent updates shouldn't trigger a reload.
+        databaseRegistry.updateDatabase("_name", "_md5", geoIpTmpDir.resolve("some-file"));
+        verifyZeroInteractions(ingestService);
+    }
+
     private String mockSearches(String databaseName, int firstChunk, int lastChunk) throws IOException {
         String dummyContent = "test: " + databaseName;
         List<byte[]> data = gzip(databaseName, dummyContent, lastChunk - firstChunk + 1);

+ 107 - 72
modules/ingest-geoip/src/test/java/org/elasticsearch/ingest/geoip/GeoIpProcessorFactoryTests.java

@@ -21,6 +21,7 @@ import org.elasticsearch.common.settings.Settings;
 import org.elasticsearch.index.VersionType;
 import org.elasticsearch.ingest.IngestDocument;
 import org.elasticsearch.ingest.IngestService;
+import org.elasticsearch.ingest.Processor;
 import org.elasticsearch.ingest.RandomDocumentPicks;
 import org.elasticsearch.persistent.PersistentTasksCustomMetadata;
 import org.elasticsearch.test.ESTestCase;
@@ -35,6 +36,7 @@ import java.io.ByteArrayInputStream;
 import java.io.IOException;
 import java.nio.file.Files;
 import java.nio.file.Path;
+import java.nio.file.StandardCopyOption;
 import java.util.ArrayList;
 import java.util.Collections;
 import java.util.EnumSet;
@@ -45,7 +47,8 @@ import java.util.Map;
 import java.util.Set;
 
 import static org.hamcrest.Matchers.equalTo;
-import static org.hamcrest.Matchers.notNullValue;
+import static org.hamcrest.Matchers.instanceOf;
+import static org.hamcrest.Matchers.nullValue;
 import static org.hamcrest.Matchers.sameInstance;
 import static org.mockito.Mockito.mock;
 import static org.mockito.Mockito.when;
@@ -53,22 +56,24 @@ import static org.mockito.Mockito.when;
 public class GeoIpProcessorFactoryTests extends ESTestCase {
 
     private Path geoipTmpDir;
+    private Path geoIpConfigDir;
+    private LocalDatabases localDatabases;
     private DatabaseRegistry databaseRegistry;
     private ClusterService clusterService;
 
     @Before
     public void loadDatabaseReaders() throws IOException {
-        final Path geoIpDir = createTempDir();
         final Path configDir = createTempDir();
-        final Path geoIpConfigDir = configDir.resolve("ingest-geoip");
+        geoIpConfigDir = configDir.resolve("ingest-geoip");
         Files.createDirectories(geoIpConfigDir);
-        copyDatabaseFiles(geoIpDir);
 
         Client client = mock(Client.class);
         GeoIpCache cache = new GeoIpCache(1000);
-        LocalDatabases localDatabases = new LocalDatabases(geoIpDir, geoIpConfigDir, new GeoIpCache(1000));
+        localDatabases = new LocalDatabases(geoIpConfigDir, new GeoIpCache(1000));
+        copyDatabaseFiles(geoIpConfigDir, localDatabases);
         geoipTmpDir = createTempDir();
         databaseRegistry = new DatabaseRegistry(geoipTmpDir, client, cache, localDatabases, Runnable::run);
+        databaseRegistry.initialize("nodeId", mock(ResourceWatcherService.class), mock(IngestService.class));
         clusterService = mock(ClusterService.class);
         when(clusterService.state()).thenReturn(ClusterState.EMPTY_STATE);
     }
@@ -86,7 +91,7 @@ public class GeoIpProcessorFactoryTests extends ESTestCase {
         config.put("field", "_field");
         String processorTag = randomAlphaOfLength(10);
 
-        GeoIpProcessor processor = factory.create(null, processorTag, null, config);
+        GeoIpProcessor processor = (GeoIpProcessor) factory.create(null, processorTag, null, config);
         assertThat(processor.getTag(), equalTo(processorTag));
         assertThat(processor.getField(), equalTo("_field"));
         assertThat(processor.getTargetField(), equalTo("geoip"));
@@ -103,7 +108,7 @@ public class GeoIpProcessorFactoryTests extends ESTestCase {
         config.put("ignore_missing", true);
         String processorTag = randomAlphaOfLength(10);
 
-        GeoIpProcessor processor = factory.create(null, processorTag, null, config);
+        GeoIpProcessor processor = (GeoIpProcessor) factory.create(null, processorTag, null, config);
         assertThat(processor.getTag(), equalTo(processorTag));
         assertThat(processor.getField(), equalTo("_field"));
         assertThat(processor.getTargetField(), equalTo("geoip"));
@@ -120,7 +125,7 @@ public class GeoIpProcessorFactoryTests extends ESTestCase {
         config.put("database_file", "GeoLite2-Country.mmdb");
         String processorTag = randomAlphaOfLength(10);
 
-        GeoIpProcessor processor = factory.create(null, processorTag, null, config);
+        GeoIpProcessor processor = (GeoIpProcessor) factory.create(null, processorTag, null, config);
 
         assertThat(processor.getTag(), equalTo(processorTag));
         assertThat(processor.getField(), equalTo("_field"));
@@ -138,7 +143,7 @@ public class GeoIpProcessorFactoryTests extends ESTestCase {
         config.put("database_file", "GeoLite2-ASN.mmdb");
         String processorTag = randomAlphaOfLength(10);
 
-        GeoIpProcessor processor = factory.create(null, processorTag, null, config);
+        GeoIpProcessor processor = (GeoIpProcessor) factory.create(null, processorTag, null, config);
 
         assertThat(processor.getTag(), equalTo(processorTag));
         assertThat(processor.getField(), equalTo("_field"));
@@ -153,7 +158,7 @@ public class GeoIpProcessorFactoryTests extends ESTestCase {
         Map<String, Object> config = new HashMap<>();
         config.put("field", "_field");
         config.put("target_field", "_field");
-        GeoIpProcessor processor = factory.create(null, null, null, config);
+        GeoIpProcessor processor = (GeoIpProcessor) factory.create(null, null, null, config);
         assertThat(processor.getField(), equalTo("_field"));
         assertThat(processor.getTargetField(), equalTo("_field"));
         assertFalse(processor.isIgnoreMissing());
@@ -164,7 +169,7 @@ public class GeoIpProcessorFactoryTests extends ESTestCase {
         Map<String, Object> config = new HashMap<>();
         config.put("field", "_field");
         config.put("database_file", "GeoLite2-Country.mmdb");
-        GeoIpProcessor processor = factory.create(null, null, null, config);
+        GeoIpProcessor processor = (GeoIpProcessor) factory.create(null, null, null, config);
         assertThat(processor.getField(), equalTo("_field"));
         assertThat(processor.getTargetField(), equalTo("geoip"));
         assertThat(processor.getDatabaseType(), equalTo("GeoLite2-Country"));
@@ -201,6 +206,9 @@ public class GeoIpProcessorFactoryTests extends ESTestCase {
     }
 
     public void testBuildNonExistingDbFile() throws Exception {
+        Files.copy(GeoIpProcessorFactoryTests.class.getResourceAsStream("/GeoLite2-City-Test.mmdb"),
+            geoipTmpDir.resolve("GeoLite2-City.mmdb"));
+        databaseRegistry.updateDatabase("GeoLite2-City.mmdb", "md5", geoipTmpDir.resolve("GeoLite2-City.mmdb"));
         GeoIpProcessor.Factory factory = new GeoIpProcessor.Factory(databaseRegistry, clusterService);
 
         Map<String, Object> config = new HashMap<>();
@@ -210,6 +218,16 @@ public class GeoIpProcessorFactoryTests extends ESTestCase {
         assertThat(e.getMessage(), equalTo("[database_file] database file [does-not-exist.mmdb] doesn't exist"));
     }
 
+    public void testBuildNoDatabasesDownloaded() throws Exception {
+        GeoIpProcessor.Factory factory = new GeoIpProcessor.Factory(databaseRegistry, clusterService);
+
+        Map<String, Object> config = new HashMap<>();
+        config.put("field", "_field");
+        config.put("database_file", "does-not-exist-yet.mmdb");
+        Processor processor = factory.create(null, null, null, config);
+        assertThat(processor, instanceOf(GeoIpProcessor.DatabaseUnavailableProcessor.class));
+    }
+
     public void testBuildFields() throws Exception {
         GeoIpProcessor.Factory factory = new GeoIpProcessor.Factory(databaseRegistry, clusterService);
 
@@ -228,7 +246,7 @@ public class GeoIpProcessorFactoryTests extends ESTestCase {
         Map<String, Object> config = new HashMap<>();
         config.put("field", "_field");
         config.put("properties", fieldNames);
-        GeoIpProcessor processor = factory.create(null, null, null, config);
+        GeoIpProcessor processor = (GeoIpProcessor) factory.create(null, null, null, config);
         assertThat(processor.getField(), equalTo("_field"));
         assertThat(processor.getProperties(), equalTo(properties));
         assertFalse(processor.isIgnoreMissing());
@@ -252,21 +270,20 @@ public class GeoIpProcessorFactoryTests extends ESTestCase {
     }
 
     public void testLazyLoading() throws Exception {
-        final Path geoIpDir = createTempDir();
         final Path configDir = createTempDir();
         final Path geoIpConfigDir = configDir.resolve("ingest-geoip");
         Files.createDirectories(geoIpConfigDir);
-        copyDatabaseFiles(geoIpDir);
+        GeoIpCache cache = new GeoIpCache(1000);
+        LocalDatabases localDatabases = new LocalDatabases(geoIpConfigDir, cache);
+        copyDatabaseFiles(geoIpConfigDir, localDatabases);
 
         // Loading another database reader instances, because otherwise we can't test lazy loading as the
         // database readers used at class level are reused between tests. (we want to keep that otherwise running this
         // test will take roughly 4 times more time)
         Client client = mock(Client.class);
-        GeoIpCache cache = new GeoIpCache(1000);
-        LocalDatabases localDatabases = new LocalDatabases(geoIpDir, geoIpConfigDir, cache);
         DatabaseRegistry databaseRegistry = new DatabaseRegistry(createTempDir(), client, cache, localDatabases, Runnable::run);
         GeoIpProcessor.Factory factory = new GeoIpProcessor.Factory(databaseRegistry, clusterService);
-        for (DatabaseReaderLazyLoader lazyLoader : localDatabases.getAllDatabases()) {
+        for (DatabaseReaderLazyLoader lazyLoader : localDatabases.getConfigDatabases().values()) {
             assertNull(lazyLoader.databaseReader.get());
         }
 
@@ -276,43 +293,43 @@ public class GeoIpProcessorFactoryTests extends ESTestCase {
         Map<String, Object> config = new HashMap<>();
         config.put("field", "_field");
         config.put("database_file", "GeoLite2-City.mmdb");
-        final GeoIpProcessor city = factory.create(null, "_tag", null, config);
+        final GeoIpProcessor city = (GeoIpProcessor) factory.create(null, "_tag", null, config);
 
         // these are lazy loaded until first use so we expect null here
-        assertNull(databaseRegistry.getDatabase("GeoLite2-City.mmdb", true).databaseReader.get());
+        assertNull(databaseRegistry.getDatabase("GeoLite2-City.mmdb").databaseReader.get());
         city.execute(document);
         // the first ingest should trigger a database load
-        assertNotNull(databaseRegistry.getDatabase("GeoLite2-City.mmdb", true).databaseReader.get());
+        assertNotNull(databaseRegistry.getDatabase("GeoLite2-City.mmdb").databaseReader.get());
 
         config = new HashMap<>();
         config.put("field", "_field");
         config.put("database_file", "GeoLite2-Country.mmdb");
-        final GeoIpProcessor country = factory.create(null, "_tag", null, config);
+        final GeoIpProcessor country = (GeoIpProcessor) factory.create(null, "_tag", null, config);
 
         // these are lazy loaded until first use so we expect null here
-        assertNull(databaseRegistry.getDatabase("GeoLite2-Country.mmdb", true).databaseReader.get());
+        assertNull(databaseRegistry.getDatabase("GeoLite2-Country.mmdb").databaseReader.get());
         country.execute(document);
         // the first ingest should trigger a database load
-        assertNotNull(databaseRegistry.getDatabase("GeoLite2-Country.mmdb", true).databaseReader.get());
+        assertNotNull(databaseRegistry.getDatabase("GeoLite2-Country.mmdb").databaseReader.get());
 
         config = new HashMap<>();
         config.put("field", "_field");
         config.put("database_file", "GeoLite2-ASN.mmdb");
-        final GeoIpProcessor asn = factory.create(null, "_tag", null, config);
+        final GeoIpProcessor asn = (GeoIpProcessor) factory.create(null, "_tag", null, config);
 
         // these are lazy loaded until first use so we expect null here
-        assertNull(databaseRegistry.getDatabase("GeoLite2-ASN.mmdb", true).databaseReader.get());
+        assertNull(databaseRegistry.getDatabase("GeoLite2-ASN.mmdb").databaseReader.get());
         asn.execute(document);
         // the first ingest should trigger a database load
-        assertNotNull(databaseRegistry.getDatabase("GeoLite2-ASN.mmdb", true).databaseReader.get());
+        assertNotNull(databaseRegistry.getDatabase("GeoLite2-ASN.mmdb").databaseReader.get());
     }
 
     public void testLoadingCustomDatabase() throws IOException {
-        final Path geoIpDir = createTempDir();
         final Path configDir = createTempDir();
         final Path geoIpConfigDir = configDir.resolve("ingest-geoip");
         Files.createDirectories(geoIpConfigDir);
-        copyDatabaseFiles(geoIpDir);
+        LocalDatabases localDatabases = new LocalDatabases(geoIpConfigDir, new GeoIpCache(1000));
+        copyDatabaseFiles(geoIpConfigDir, localDatabases);
         // fake the GeoIP2-City database
         copyDatabaseFile(geoIpConfigDir, "GeoLite2-City.mmdb");
         Files.move(geoIpConfigDir.resolve("GeoLite2-City.mmdb"), geoIpConfigDir.resolve("GeoIP2-City.mmdb"));
@@ -323,13 +340,12 @@ public class GeoIpProcessorFactoryTests extends ESTestCase {
          */
         ThreadPool threadPool = new TestThreadPool("test");
         ResourceWatcherService resourceWatcherService = new ResourceWatcherService(Settings.EMPTY, threadPool);
-        LocalDatabases localDatabases = new LocalDatabases(geoIpDir, geoIpConfigDir, new GeoIpCache(1000));
         Client client = mock(Client.class);
         GeoIpCache cache = new GeoIpCache(1000);
         DatabaseRegistry databaseRegistry = new DatabaseRegistry(createTempDir(), client, cache, localDatabases, Runnable::run);
         databaseRegistry.initialize("nodeId", resourceWatcherService, mock(IngestService.class));
         GeoIpProcessor.Factory factory = new GeoIpProcessor.Factory(databaseRegistry, clusterService);
-        for (DatabaseReaderLazyLoader lazyLoader : localDatabases.getAllDatabases()) {
+        for (DatabaseReaderLazyLoader lazyLoader : localDatabases.getConfigDatabases().values()) {
             assertNull(lazyLoader.databaseReader.get());
         }
 
@@ -339,35 +355,24 @@ public class GeoIpProcessorFactoryTests extends ESTestCase {
         Map<String, Object> config = new HashMap<>();
         config.put("field", "_field");
         config.put("database_file", "GeoIP2-City.mmdb");
-        final GeoIpProcessor city = factory.create(null, "_tag", null, config);
+        final GeoIpProcessor city = (GeoIpProcessor) factory.create(null, "_tag", null, config);
 
         // these are lazy loaded until first use so we expect null here
-        assertNull(databaseRegistry.getDatabase("GeoIP2-City.mmdb", true).databaseReader.get());
+        assertNull(databaseRegistry.getDatabase("GeoIP2-City.mmdb").databaseReader.get());
         city.execute(document);
         // the first ingest should trigger a database load
-        assertNotNull(databaseRegistry.getDatabase("GeoIP2-City.mmdb", true).databaseReader.get());
+        assertNotNull(databaseRegistry.getDatabase("GeoIP2-City.mmdb").databaseReader.get());
         resourceWatcherService.close();
         threadPool.shutdown();
     }
 
     public void testFallbackUsingDefaultDatabases() throws Exception {
         GeoIpProcessor.Factory factory = new GeoIpProcessor.Factory(databaseRegistry, clusterService);
-        {
-            Map<String, Object> config = new HashMap<>();
-            config.put("field", "source_field");
-            config.put("fallback_to_default_databases", false);
-            Exception e = expectThrows(ElasticsearchParseException.class, () -> factory.create(null, null, null, config));
-            assertThat(e.getMessage(), equalTo("[database_file] database file [GeoLite2-City.mmdb] doesn't exist"));
-        }
-        {
-            Map<String, Object> config = new HashMap<>();
-            config.put("field", "source_field");
-            if (randomBoolean()) {
-                config.put("fallback_to_default_databases", true);
-            }
-            GeoIpProcessor processor = factory.create(null, null, null, config);
-            assertThat(processor, notNullValue());
-        }
+        Map<String, Object> config = new HashMap<>();
+        config.put("field", "source_field");
+        config.put("fallback_to_default_databases", randomBoolean());
+        factory.create(null, null, null, config);
+        assertWarnings(GeoIpProcessor.DEFAULT_DATABASES_DEPRECATION_MESSAGE);
     }
 
     public void testDefaultDatabaseWithTaskPresent() throws Exception {
@@ -385,63 +390,93 @@ public class GeoIpProcessorFactoryTests extends ESTestCase {
         config.put("field", "_field");
         String processorTag = randomAlphaOfLength(10);
 
-        GeoIpProcessor processor = factory.create(null, processorTag, null, config);
+        GeoIpProcessor processor = (GeoIpProcessor) factory.create(null, processorTag, null, config);
 
         processor.execute(RandomDocumentPicks.randomIngestDocument(random(), Map.of("_field", "89.160.20.128")));
     }
 
-    public void testFallbackUsingDefaultDatabasesWhileIngesting() throws Exception {
-        copyDatabaseFile(geoipTmpDir, "GeoLite2-City-Test.mmdb");
+    public void testUpdateDatabaseWhileIngesting() throws Exception {
         GeoIpProcessor.Factory factory = new GeoIpProcessor.Factory(databaseRegistry, clusterService);
-        // fallback_to_default_databases=true, first use default city db then a custom city db:
+        Map<String, Object> config = new HashMap<>();
+        config.put("field", "source_field");
+        GeoIpProcessor processor = (GeoIpProcessor) factory.create(null, null, null, config);
+        Map<String, Object> document = new HashMap<>();
+        document.put("source_field", "89.160.20.128");
         {
-            Map<String, Object> config = new HashMap<>();
-            config.put("field", "source_field");
-            if (randomBoolean()) {
-                config.put("fallback_to_default_databases", true);
-            }
-            GeoIpProcessor processor = factory.create(null, null, null, config);
-            Map<String, Object> document = new HashMap<>();
-            document.put("source_field", "89.160.20.128");
             IngestDocument ingestDocument = RandomDocumentPicks.randomIngestDocument(random(), document);
             processor.execute(ingestDocument);
             Map<?, ?> geoData = (Map<?, ?>) ingestDocument.getSourceAndMetadata().get("geoip");
             assertThat(geoData.get("city_name"), equalTo("Tumba"));
-
+        }
+        {
+            copyDatabaseFile(geoipTmpDir, "GeoLite2-City-Test.mmdb");
+            IngestDocument ingestDocument = RandomDocumentPicks.randomIngestDocument(random(), document);
             databaseRegistry.updateDatabase("GeoLite2-City.mmdb", "md5", geoipTmpDir.resolve("GeoLite2-City-Test.mmdb"));
-            ingestDocument = RandomDocumentPicks.randomIngestDocument(random(), document);
             processor.execute(ingestDocument);
-            geoData = (Map<?, ?>) ingestDocument.getSourceAndMetadata().get("geoip");
+            Map<?, ?> geoData = (Map<?, ?>) ingestDocument.getSourceAndMetadata().get("geoip");
             assertThat(geoData.get("city_name"), equalTo("Linköping"));
         }
-        // fallback_to_default_databases=false, first use a custom city db then remove the custom db and expect failure:
+        {
+            IngestDocument ingestDocument = RandomDocumentPicks.randomIngestDocument(random(), document);
+            databaseRegistry.removeStaleEntries(List.of("GeoLite2-City.mmdb"));
+            localDatabases.updateDatabase(geoIpConfigDir.resolve("GeoLite2-City.mmdb"), false);
+            Exception e = expectThrows(ResourceNotFoundException.class, () -> processor.execute(ingestDocument));
+            assertThat(e.getMessage(), equalTo("database file [GeoLite2-City.mmdb] doesn't exist"));
+        }
+    }
+
+    public void testDatabaseNotReadyYet() throws Exception {
+        GeoIpProcessor.Factory factory = new GeoIpProcessor.Factory(databaseRegistry, clusterService);
+
         {
             Map<String, Object> config = new HashMap<>();
             config.put("field", "source_field");
-            config.put("fallback_to_default_databases", false);
-            GeoIpProcessor processor = factory.create(null, null, null, config);
+            config.put("database_file", "GeoLite2-City-Test.mmdb");
+
             Map<String, Object> document = new HashMap<>();
             document.put("source_field", "89.160.20.128");
             IngestDocument ingestDocument = RandomDocumentPicks.randomIngestDocument(random(), document);
+
+            GeoIpProcessor.DatabaseUnavailableProcessor processor =
+                (GeoIpProcessor.DatabaseUnavailableProcessor) factory.create(null, null, null, config);
             processor.execute(ingestDocument);
+            assertThat(ingestDocument.getSourceAndMetadata().get("geoip"), nullValue());
+            assertThat(ingestDocument.getSourceAndMetadata().get("tags"),
+                equalTo(List.of("_geoip_database_unavailable_GeoLite2-City-Test.mmdb")));
+        }
+
+        copyDatabaseFile(geoipTmpDir, "GeoLite2-City-Test.mmdb");
+        databaseRegistry.updateDatabase("GeoLite2-City-Test.mmdb", "md5", geoipTmpDir.resolve("GeoLite2-City-Test.mmdb"));
+
+        {
+            Map<String, Object> config = new HashMap<>();
+            config.put("field", "source_field");
+            config.put("database_file", "GeoLite2-City-Test.mmdb");
+
+            Map<String, Object> document = new HashMap<>();
+            document.put("source_field", "89.160.20.128");
+            IngestDocument ingestDocument = RandomDocumentPicks.randomIngestDocument(random(), document);
+
+            GeoIpProcessor processor = (GeoIpProcessor) factory.create(null, null, null, config);
+            processor.execute(ingestDocument);
+            assertThat(ingestDocument.getSourceAndMetadata().get("tags"), nullValue());
             Map<?, ?> geoData = (Map<?, ?>) ingestDocument.getSourceAndMetadata().get("geoip");
             assertThat(geoData.get("city_name"), equalTo("Linköping"));
-            databaseRegistry.removeStaleEntries(List.of("GeoLite2-City.mmdb"));
-            Exception e = expectThrows(ResourceNotFoundException.class, () -> processor.execute(ingestDocument));
-            assertThat(e.getMessage(), equalTo("database file [GeoLite2-City.mmdb] doesn't exist"));
         }
     }
 
     private static void copyDatabaseFile(final Path path, final String databaseFilename) throws IOException {
         Files.copy(
             new ByteArrayInputStream(StreamsUtils.copyToBytesFromClasspath("/" + databaseFilename)),
-            path.resolve(databaseFilename)
+            path.resolve(databaseFilename),
+            StandardCopyOption.REPLACE_EXISTING
         );
     }
 
-    static void copyDatabaseFiles(final Path path) throws IOException {
+    static void copyDatabaseFiles(final Path path, LocalDatabases localDatabases) throws IOException {
         for (final String databaseFilename : IngestGeoIpPlugin.DEFAULT_DATABASE_FILENAMES) {
             copyDatabaseFile(path, databaseFilename);
+            localDatabases.updateDatabase(path.resolve(databaseFilename), true);
         }
     }
 

+ 28 - 41
modules/ingest-geoip/src/test/java/org/elasticsearch/ingest/geoip/LocalDatabasesTests.java

@@ -20,11 +20,14 @@ import org.junit.After;
 import org.junit.Before;
 
 import java.io.IOException;
+import java.nio.file.CopyOption;
 import java.nio.file.Files;
 import java.nio.file.Path;
 import java.nio.file.StandardCopyOption;
 
+import static org.hamcrest.Matchers.anEmptyMap;
 import static org.hamcrest.Matchers.equalTo;
+import static org.hamcrest.Matchers.nullValue;
 
 public class LocalDatabasesTests extends ESTestCase {
 
@@ -46,19 +49,13 @@ public class LocalDatabasesTests extends ESTestCase {
 
     public void testLocalDatabasesEmptyConfig() throws Exception {
         Path configDir = createTempDir();
-        LocalDatabases localDatabases = new LocalDatabases(prepareModuleDir(), configDir, new GeoIpCache(0));
+        LocalDatabases localDatabases = new LocalDatabases(configDir, new GeoIpCache(0));
         localDatabases.initialize(resourceWatcherService);
 
-        assertThat(localDatabases.getDefaultDatabases().size(), equalTo(3));
-        assertThat(localDatabases.getConfigDatabases().size(), equalTo(0));
-        DatabaseReaderLazyLoader loader = localDatabases.getDatabase("GeoLite2-ASN.mmdb", true);
-        assertThat(loader.getDatabaseType(), equalTo("GeoLite2-ASN"));
-
-        loader = localDatabases.getDatabase("GeoLite2-City.mmdb", true);
-        assertThat(loader.getDatabaseType(), equalTo("GeoLite2-City"));
-
-        loader = localDatabases.getDatabase("GeoLite2-Country.mmdb", true);
-        assertThat(loader.getDatabaseType(), equalTo("GeoLite2-Country"));
+        assertThat(localDatabases.getConfigDatabases(), anEmptyMap());
+        assertThat(localDatabases.getDatabase("GeoLite2-ASN.mmdb"), nullValue());
+        assertThat(localDatabases.getDatabase("GeoLite2-City.mmdb"), nullValue());
+        assertThat(localDatabases.getDatabase("GeoLite2-Country.mmdb"), nullValue());
     }
 
     public void testDatabasesConfigDir() throws Exception {
@@ -66,55 +63,48 @@ public class LocalDatabasesTests extends ESTestCase {
         Files.copy(LocalDatabases.class.getResourceAsStream("/GeoIP2-City-Test.mmdb"), configDir.resolve("GeoIP2-City.mmdb"));
         Files.copy(LocalDatabases.class.getResourceAsStream("/GeoLite2-City-Test.mmdb"), configDir.resolve("GeoLite2-City.mmdb"));
 
-        LocalDatabases localDatabases = new LocalDatabases(prepareModuleDir(), configDir, new GeoIpCache(0));
+        LocalDatabases localDatabases = new LocalDatabases(configDir, new GeoIpCache(0));
         localDatabases.initialize(resourceWatcherService);
 
-        assertThat(localDatabases.getDefaultDatabases().size(), equalTo(3));
         assertThat(localDatabases.getConfigDatabases().size(), equalTo(2));
-        DatabaseReaderLazyLoader loader = localDatabases.getDatabase("GeoLite2-ASN.mmdb", true);
-        assertThat(loader.getDatabaseType(), equalTo("GeoLite2-ASN"));
-
-        loader = localDatabases.getDatabase("GeoLite2-City.mmdb", true);
+        DatabaseReaderLazyLoader loader = localDatabases.getDatabase("GeoLite2-City.mmdb");
         assertThat(loader.getDatabaseType(), equalTo("GeoLite2-City"));
 
-        loader = localDatabases.getDatabase("GeoLite2-Country.mmdb", true);
-        assertThat(loader.getDatabaseType(), equalTo("GeoLite2-Country"));
-
-        loader = localDatabases.getDatabase("GeoIP2-City.mmdb", true);
+        loader = localDatabases.getDatabase("GeoIP2-City.mmdb");
         assertThat(loader.getDatabaseType(), equalTo("GeoIP2-City"));
     }
 
     public void testDatabasesDynamicUpdateConfigDir() throws Exception {
-        Path configDir = createTempDir();
-        LocalDatabases localDatabases = new LocalDatabases(prepareModuleDir(), configDir, new GeoIpCache(0));
+        Path configDir = prepareConfigDir();
+        LocalDatabases localDatabases = new LocalDatabases(configDir, new GeoIpCache(0));
         localDatabases.initialize(resourceWatcherService);
         {
-            assertThat(localDatabases.getDefaultDatabases().size(), equalTo(3));
-            DatabaseReaderLazyLoader loader = localDatabases.getDatabase("GeoLite2-ASN.mmdb", true);
+            assertThat(localDatabases.getConfigDatabases().size(), equalTo(3));
+            DatabaseReaderLazyLoader loader = localDatabases.getDatabase("GeoLite2-ASN.mmdb");
             assertThat(loader.getDatabaseType(), equalTo("GeoLite2-ASN"));
 
-            loader = localDatabases.getDatabase("GeoLite2-City.mmdb", true);
+            loader = localDatabases.getDatabase("GeoLite2-City.mmdb");
             assertThat(loader.getDatabaseType(), equalTo("GeoLite2-City"));
 
-            loader = localDatabases.getDatabase("GeoLite2-Country.mmdb", true);
+            loader = localDatabases.getDatabase("GeoLite2-Country.mmdb");
             assertThat(loader.getDatabaseType(), equalTo("GeoLite2-Country"));
         }
 
+        CopyOption option = StandardCopyOption.REPLACE_EXISTING;
         Files.copy(LocalDatabases.class.getResourceAsStream("/GeoIP2-City-Test.mmdb"), configDir.resolve("GeoIP2-City.mmdb"));
-        Files.copy(LocalDatabases.class.getResourceAsStream("/GeoLite2-City-Test.mmdb"), configDir.resolve("GeoLite2-City.mmdb"));
+        Files.copy(LocalDatabases.class.getResourceAsStream("/GeoLite2-City-Test.mmdb"), configDir.resolve("GeoLite2-City.mmdb"), option);
         assertBusy(() -> {
-            assertThat(localDatabases.getDefaultDatabases().size(), equalTo(3));
-            assertThat(localDatabases.getConfigDatabases().size(), equalTo(2));
-            DatabaseReaderLazyLoader loader = localDatabases.getDatabase("GeoLite2-ASN.mmdb", true);
+            assertThat(localDatabases.getConfigDatabases().size(), equalTo(4));
+            DatabaseReaderLazyLoader loader = localDatabases.getDatabase("GeoLite2-ASN.mmdb");
             assertThat(loader.getDatabaseType(), equalTo("GeoLite2-ASN"));
 
-            loader = localDatabases.getDatabase("GeoLite2-City.mmdb", true);
+            loader = localDatabases.getDatabase("GeoLite2-City.mmdb");
             assertThat(loader.getDatabaseType(), equalTo("GeoLite2-City"));
 
-            loader = localDatabases.getDatabase("GeoLite2-Country.mmdb", true);
+            loader = localDatabases.getDatabase("GeoLite2-Country.mmdb");
             assertThat(loader.getDatabaseType(), equalTo("GeoLite2-Country"));
 
-            loader = localDatabases.getDatabase("GeoIP2-City.mmdb", true);
+            loader = localDatabases.getDatabase("GeoIP2-City.mmdb");
             assertThat(loader.getDatabaseType(), equalTo("GeoIP2-City"));
         });
     }
@@ -123,14 +113,13 @@ public class LocalDatabasesTests extends ESTestCase {
         Path configDir = createTempDir();
         Files.copy(LocalDatabases.class.getResourceAsStream("/GeoLite2-City.mmdb"), configDir.resolve("GeoLite2-City.mmdb"));
         GeoIpCache cache = new GeoIpCache(1000); // real cache to test purging of entries upon a reload
-        LocalDatabases localDatabases = new LocalDatabases(prepareModuleDir(), configDir, cache);
+        LocalDatabases localDatabases = new LocalDatabases(configDir, cache);
         localDatabases.initialize(resourceWatcherService);
         {
             assertThat(cache.count(), equalTo(0));
-            assertThat(localDatabases.getDefaultDatabases().size(), equalTo(3));
             assertThat(localDatabases.getConfigDatabases().size(), equalTo(1));
 
-            DatabaseReaderLazyLoader loader = localDatabases.getDatabase("GeoLite2-City.mmdb", true);
+            DatabaseReaderLazyLoader loader = localDatabases.getDatabase("GeoLite2-City.mmdb");
             assertThat(loader.getDatabaseType(), equalTo("GeoLite2-City"));
             CityResponse cityResponse = loader.getCity(InetAddresses.forString("89.160.20.128"));
             assertThat(cityResponse.getCity().getName(), equalTo("Tumba"));
@@ -140,11 +129,10 @@ public class LocalDatabasesTests extends ESTestCase {
         Files.copy(LocalDatabases.class.getResourceAsStream("/GeoLite2-City-Test.mmdb"), configDir.resolve("GeoLite2-City.mmdb"),
             StandardCopyOption.REPLACE_EXISTING);
         assertBusy(() -> {
-            assertThat(localDatabases.getDefaultDatabases().size(), equalTo(3));
             assertThat(localDatabases.getConfigDatabases().size(), equalTo(1));
             assertThat(cache.count(), equalTo(0));
 
-            DatabaseReaderLazyLoader loader = localDatabases.getDatabase("GeoLite2-City.mmdb", true);
+            DatabaseReaderLazyLoader loader = localDatabases.getDatabase("GeoLite2-City.mmdb");
             assertThat(loader.getDatabaseType(), equalTo("GeoLite2-City"));
             CityResponse cityResponse = loader.getCity(InetAddresses.forString("89.160.20.128"));
             assertThat(cityResponse.getCity().getName(), equalTo("Linköping"));
@@ -153,13 +141,12 @@ public class LocalDatabasesTests extends ESTestCase {
 
         Files.delete(configDir.resolve("GeoLite2-City.mmdb"));
         assertBusy(() -> {
-            assertThat(localDatabases.getDefaultDatabases().size(), equalTo(3));
             assertThat(localDatabases.getConfigDatabases().size(), equalTo(0));
             assertThat(cache.count(), equalTo(0));
         });
     }
 
-    private static Path prepareModuleDir() throws IOException {
+    private static Path prepareConfigDir() throws IOException {
         Path dir = createTempDir();
         Files.copy(LocalDatabases.class.getResourceAsStream("/GeoLite2-ASN.mmdb"), dir.resolve("GeoLite2-ASN.mmdb"));
         Files.copy(LocalDatabases.class.getResourceAsStream("/GeoLite2-City.mmdb"), dir.resolve("GeoLite2-City.mmdb"));

+ 2 - 1
modules/ingest-geoip/src/test/java/org/elasticsearch/ingest/geoip/stats/GeoIpDownloaderStatsActionNodeResponseSerializingTests.java

@@ -32,7 +32,8 @@ public class GeoIpDownloaderStatsActionNodeResponseSerializingTests extends
         DiscoveryNode node = new DiscoveryNode("id", buildNewFakeTransportAddress(), Version.CURRENT);
         Set<String> databases = Set.copyOf(randomList(10, () -> randomAlphaOfLengthBetween(5, 10)));
         Set<String> files = Set.copyOf(randomList(10, () -> randomAlphaOfLengthBetween(5, 10)));
+        Set<String> configDatabases = Set.copyOf(randomList(10, () -> randomAlphaOfLengthBetween(5, 10)));
         return new GeoIpDownloaderStatsAction.NodeResponse(node, GeoIpDownloaderStatsSerializingTests.createRandomInstance(), databases,
-            files);
+            files, configDatabases);
     }
 }

+ 29 - 0
modules/ingest-geoip/src/yamlRestTest/java/org/elasticsearch/ingest/geoip/IngestGeoIpClientYamlTestSuiteIT.java

@@ -11,8 +11,17 @@ package org.elasticsearch.ingest.geoip;
 import com.carrotsearch.randomizedtesting.annotations.Name;
 import com.carrotsearch.randomizedtesting.annotations.ParametersFactory;
 
+import org.elasticsearch.client.Request;
 import org.elasticsearch.test.rest.yaml.ClientYamlTestCandidate;
 import org.elasticsearch.test.rest.yaml.ESClientYamlSuiteTestCase;
+import org.junit.Before;
+
+import java.util.List;
+import java.util.Map;
+import java.util.stream.Collectors;
+
+import static org.hamcrest.Matchers.containsInAnyOrder;
+import static org.hamcrest.Matchers.equalTo;
 
 public class IngestGeoIpClientYamlTestSuiteIT extends ESClientYamlSuiteTestCase {
 
@@ -24,4 +33,24 @@ public class IngestGeoIpClientYamlTestSuiteIT extends ESClientYamlSuiteTestCase
     public static Iterable<Object[]> parameters() throws Exception {
         return ESClientYamlSuiteTestCase.createParameters();
     }
+
+    @Before
+    public void waitForDatabases() throws Exception {
+        assertBusy(() -> {
+            Request request = new Request("GET", "/_ingest/geoip/stats");
+            Map<String, Object> response = entityAsMap(client().performRequest(request));
+
+            Map<?, ?> downloadStats = (Map<?, ?>) response.get("stats");
+            assertThat(downloadStats.get("databases_count"), equalTo(3));
+
+            Map<?, ?> nodes = (Map<?, ?>) response.get("nodes");
+            assertThat(nodes.size(), equalTo(1));
+            Map<?, ?> node = (Map<?, ?>) nodes.values().iterator().next();
+            List<String> databases = ((List<?>) node.get("databases")).stream()
+                .map(o -> (String) ((Map<?, ?>) o).get("name"))
+                .collect(Collectors.toList());
+            assertThat(databases, containsInAnyOrder("GeoLite2-City.mmdb", "GeoLite2-Country.mmdb", "GeoLite2-ASN.mmdb"));
+        });
+    }
+
 }

+ 63 - 63
modules/ingest-geoip/src/yamlRestTest/resources/rest-api-spec/test/ingest_geoip/20_geoip_processor.yml

@@ -21,22 +21,22 @@
         index: test
         id: 1
         pipeline: "my_pipeline"
-        body: {field1: "128.101.101.101"}
+        body: {field1: "89.160.20.128"}
 
   - do:
       get:
         index: test
         id: 1
-  - match: { _source.field1: "128.101.101.101" }
+  - match: { _source.field1: "89.160.20.128" }
   - length: { _source.geoip: 7 }
-  - match: { _source.geoip.city_name: "Minneapolis" }
-  - match: { _source.geoip.country_iso_code: "US" }
-  - match: { _source.geoip.location.lon: -93.2548 }
-  - match: { _source.geoip.location.lat: 44.9399 }
-  - match: { _source.geoip.region_iso_code: "US-MN" }
-  - match: { _source.geoip.country_name: "United States" }
-  - match: { _source.geoip.region_name: "Minnesota" }
-  - match: { _source.geoip.continent_name: "North America" }
+  - match: { _source.geoip.city_name: "Linköping" }
+  - match: { _source.geoip.country_iso_code: "SE" }
+  - match: { _source.geoip.location.lon: 15.6167 }
+  - match: { _source.geoip.location.lat: 58.4167 }
+  - match: { _source.geoip.region_iso_code: "SE-E" }
+  - match: { _source.geoip.country_name: "Sweden" }
+  - match: { _source.geoip.region_name: "Östergötland County" }
+  - match: { _source.geoip.continent_name: "Europe" }
 
 ---
 "Test geoip processor with list":
@@ -62,23 +62,23 @@
         index: test
         id: 1
         pipeline: "my_pipeline"
-        body: {field1: ["128.101.101.101", "127.0.0.1"]}
+        body: {field1: ["89.160.20.128", "127.0.0.1"]}
 
   - do:
       get:
         index: test
         id: 1
-  - match: { _source.field1: ["128.101.101.101", "127.0.0.1"] }
+  - match: { _source.field1: ["89.160.20.128", "127.0.0.1"] }
   - length: { _source.geoip: 2 }
   - length: { _source.geoip.0: 7 }
-  - match: { _source.geoip.0.city_name: "Minneapolis" }
-  - match: { _source.geoip.0.country_iso_code: "US" }
-  - match: { _source.geoip.0.location.lon: -93.2548 }
-  - match: { _source.geoip.0.location.lat: 44.9399 }
-  - match: { _source.geoip.0.region_iso_code: "US-MN" }
-  - match: { _source.geoip.0.country_name: "United States" }
-  - match: { _source.geoip.0.region_name: "Minnesota" }
-  - match: { _source.geoip.0.continent_name: "North America" }
+  - match: { _source.geoip.0.city_name: "Linköping" }
+  - match: { _source.geoip.0.country_iso_code: "SE" }
+  - match: { _source.geoip.0.location.lon: 15.6167 }
+  - match: { _source.geoip.0.location.lat: 58.4167 }
+  - match: { _source.geoip.0.region_iso_code: "SE-E" }
+  - match: { _source.geoip.0.country_name: "Sweden" }
+  - match: { _source.geoip.0.region_name: "Östergötland County" }
+  - match: { _source.geoip.0.continent_name: "Europe" }
   - match: { _source.geoip.1: null }
 
 ---
@@ -104,22 +104,22 @@
         index: test
         id: 1
         pipeline: "my_pipeline"
-        body: {field1: ["127.0.0.1", "128.101.101.101", "128.101.101.101"]}
+        body: {field1: ["127.0.0.1", "89.160.20.128", "89.160.20.128"]}
 
   - do:
       get:
         index: test
         id: 1
-  - match: { _source.field1: ["127.0.0.1", "128.101.101.101", "128.101.101.101"] }
+  - match: { _source.field1: ["127.0.0.1", "89.160.20.128", "89.160.20.128"] }
   - length: { _source.geoip: 7 }
-  - match: { _source.geoip.city_name: "Minneapolis" }
-  - match: { _source.geoip.country_iso_code: "US" }
-  - match: { _source.geoip.location.lon: -93.2548 }
-  - match: { _source.geoip.location.lat: 44.9399 }
-  - match: { _source.geoip.region_iso_code: "US-MN" }
-  - match: { _source.geoip.country_name: "United States" }
-  - match: { _source.geoip.region_name: "Minnesota" }
-  - match: { _source.geoip.continent_name: "North America" }
+  - match: { _source.geoip.city_name: "Linköping" }
+  - match: { _source.geoip.country_iso_code: "SE" }
+  - match: { _source.geoip.location.lon: 15.6167 }
+  - match: { _source.geoip.location.lat: 58.4167 }
+  - match: { _source.geoip.region_iso_code: "SE-E" }
+  - match: { _source.geoip.country_name: "Sweden" }
+  - match: { _source.geoip.region_name: "Östergötland County" }
+  - match: { _source.geoip.continent_name: "Europe" }
 
 ---
 "Test geoip processor with fields":
@@ -149,24 +149,24 @@
         index: test
         id: 1
         pipeline: "my_pipeline"
-        body: {field1: "128.101.101.101"}
+        body: {field1: "89.160.20.128"}
 
   - do:
       get:
         index: test
         id: 1
-  - match: { _source.field1: "128.101.101.101" }
+  - match: { _source.field1: "89.160.20.128" }
   - length: { _source.geoip: 9 }
-  - match: { _source.geoip.city_name: "Minneapolis" }
-  - match: { _source.geoip.country_iso_code: "US" }
-  - match: { _source.geoip.ip: "128.101.101.101" }
-  - match: { _source.geoip.location.lon: -93.2548 }
-  - match: { _source.geoip.location.lat: 44.9399 }
-  - match: { _source.geoip.timezone: "America/Chicago" }
-  - match: { _source.geoip.country_name: "United States" }
-  - match: { _source.geoip.region_iso_code: "US-MN" }
-  - match: { _source.geoip.region_name: "Minnesota" }
-  - match: { _source.geoip.continent_name: "North America" }
+  - match: { _source.geoip.city_name: "Linköping" }
+  - match: { _source.geoip.country_iso_code: "SE" }
+  - match: { _source.geoip.ip: "89.160.20.128" }
+  - match: { _source.geoip.location.lon: 15.6167 }
+  - match: { _source.geoip.location.lat: 58.4167 }
+  - match: { _source.geoip.timezone: "Europe/Stockholm" }
+  - match: { _source.geoip.country_name: "Sweden" }
+  - match: { _source.geoip.region_iso_code: "SE-E" }
+  - match: { _source.geoip.region_name: "Östergötland County" }
+  - match: { _source.geoip.continent_name: "Europe" }
 
 ---
 "Test geoip processor with different database file - GeoLite2-Country":
@@ -192,17 +192,17 @@
         index: test
         id: 1
         pipeline: "my_pipeline"
-        body: {field1: "128.101.101.101"}
+        body: {field1: "89.160.20.128"}
 
   - do:
       get:
         index: test
         id: 1
-  - match: { _source.field1: "128.101.101.101" }
+  - match: { _source.field1: "89.160.20.128" }
   - length: { _source.geoip: 3 }
-  - match: { _source.geoip.country_iso_code: "US" }
-  - match: { _source.geoip.country_name: "United States" }
-  - match: { _source.geoip.continent_name: "North America" }
+  - match: { _source.geoip.country_iso_code: "SE" }
+  - match: { _source.geoip.country_name: "Sweden" }
+  - match: { _source.geoip.continent_name: "Europe" }
 
 ---
 "Test geoip processor with geopoint mapping (both missing and including location)":
@@ -256,22 +256,22 @@
         index: test
         id: 2
         pipeline: "my_pipeline"
-        body: { field1: "128.101.101.101" }
+        body: { field1: "89.160.20.128" }
 
   - do:
       get:
         index: test
         id: 2
-  - match: { _source.field1: "128.101.101.101" }
+  - match: { _source.field1: "89.160.20.128" }
   - length: { _source.geoip: 7 }
-  - match: { _source.geoip.city_name: "Minneapolis" }
-  - match: { _source.geoip.country_iso_code: "US" }
-  - match: { _source.geoip.location.lon: -93.2548 }
-  - match: { _source.geoip.location.lat: 44.9399 }
-  - match: { _source.geoip.region_iso_code: "US-MN" }
-  - match: { _source.geoip.country_name: "United States" }
-  - match: { _source.geoip.region_name: "Minnesota" }
-  - match: { _source.geoip.continent_name: "North America" }
+  - match: { _source.geoip.city_name: "Linköping" }
+  - match: { _source.geoip.country_iso_code: "SE" }
+  - match: { _source.geoip.location.lon: 15.6167 }
+  - match: { _source.geoip.location.lat: 58.4167 }
+  - match: { _source.geoip.region_iso_code: "SE-E" }
+  - match: { _source.geoip.country_name: "Sweden" }
+  - match: { _source.geoip.region_name: "Östergötland County" }
+  - match: { _source.geoip.continent_name: "Europe" }
 
 ---
 "Test geoip processor with different database file - GeoLite2-ASN":
@@ -297,15 +297,15 @@
         index: test
         id: 1
         pipeline: "my_pipeline"
-        body: {field1: "82.171.64.0"}
+        body: {field1: "89.160.20.128"}
 
   - do:
       get:
         index: test
         id: 1
-  - match: { _source.field1: "82.171.64.0" }
+  - match: { _source.field1: "89.160.20.128" }
   - length: { _source.geoip: 4 }
-  - match: { _source.geoip.ip: "82.171.64.0" }
-  - match: { _source.geoip.asn: 1136 }
-  - match: { _source.geoip.organization_name: "KPN B.V." }
-  - match: { _source.geoip.network: "82.168.0.0/14" }
+  - match: { _source.geoip.ip: "89.160.20.128" }
+  - match: { _source.geoip.asn: 29518 }
+  - match: { _source.geoip.organization_name: "Bredband2 AB" }
+  - match: { _source.geoip.network: "89.160.0.0/17" }

+ 5 - 0
qa/smoke-test-ingest-with-all-dependencies/build.gradle

@@ -21,6 +21,7 @@ dependencies {
 
 testClusters.configureEach {
   setting 'xpack.security.enabled', 'false'
+  extraConfigFile 'ingest-geoip/GeoLite2-City.mmdb', file("${project.projectDir}/src/test/resources/GeoLite2-City.mmdb")
 }
 
 tasks.named("testingConventions").configure {
@@ -30,3 +31,7 @@ tasks.named("testingConventions").configure {
     }
   }
 }
+
+tasks.named("forbiddenPatterns").configure {
+  exclude '**/*.mmdb'
+}

BIN=BIN
qa/smoke-test-ingest-with-all-dependencies/src/test/resources/GeoLite2-City.mmdb


+ 8 - 8
qa/smoke-test-ingest-with-all-dependencies/src/test/resources/rest-api-spec/test/ingest/20_combine_processors.yml

@@ -46,7 +46,7 @@
         id: 1
         pipeline: "_id"
         body: {
-          log: "70.193.17.92 - - [08/Sep/2014:02:54:42 +0000] \"GET /presentations/logstash-scale11x/images/ahhh___rage_face_by_samusmmx-d5g5zap.png HTTP/1.1\" 200 175208 \"http://mobile.rivals.com/board_posts.asp?SID=880&mid=198829575&fid=2208&tid=198829575&Team=&TeamId=&SiteId=\" \"Mozilla/5.0 (Linux; Android 4.2.2; VS980 4G Build/JDQ39B) AppleWebKit/537.36 (KHTML, like Gecko) Chrome/33.0.1750.135 Mobile Safari/537.36\""
+          log: "89.160.20.128 - - [08/Sep/2014:02:54:42 +0000] \"GET /presentations/logstash-scale11x/images/ahhh___rage_face_by_samusmmx-d5g5zap.png HTTP/1.1\" 200 175208 \"http://mobile.rivals.com/board_posts.asp?SID=880&mid=198829575&fid=2208&tid=198829575&Team=&TeamId=&SiteId=\" \"Mozilla/5.0 (Linux; Android 4.2.2; VS980 4G Build/JDQ39B) AppleWebKit/537.36 (KHTML, like Gecko) Chrome/33.0.1750.135 Mobile Safari/537.36\""
         }
 
   - do:
@@ -61,11 +61,11 @@
   - match: { _source.referrer: "\"http://mobile.rivals.com/board_posts.asp?SID=880&mid=198829575&fid=2208&tid=198829575&Team=&TeamId=&SiteId=\"" }
   - match: { _source.response: 200 }
   - match: { _source.bytes: 175208 }
-  - match: { _source.clientip: "70.193.17.92" }
+  - match: { _source.clientip: "89.160.20.128" }
   - match: { _source.httpversion: "1.1" }
   - match: { _source.timestamp: "2014-09-08T02:54:42.000Z" }
-  - match: { _source.geoip.continent_name: "North America" }
-  - match: { _source.geoip.country_iso_code: "US" }
+  - match: { _source.geoip.continent_name: "Europe" }
+  - match: { _source.geoip.country_iso_code: "SE" }
 
 ---
 "Test with date processor and ECS-v1":
@@ -104,7 +104,7 @@
         id: 1
         pipeline: "_id"
         body: {
-          log: "70.193.17.92 - - [08/Sep/2014:02:54:42 +0000] \"GET /presentations/logstash-scale11x/images/ahhh___rage_face_by_samusmmx-d5g5zap.png HTTP/1.1\" 200 175208 \"http://mobile.rivals.com/board_posts.asp?SID=880&mid=198829575&fid=2208&tid=198829575&Team=&TeamId=&SiteId=\" \"Mozilla/5.0 (Linux; Android 4.2.2; VS980 4G Build/JDQ39B) AppleWebKit/537.36 (KHTML, like Gecko) Chrome/33.0.1750.135 Mobile Safari/537.36\""
+          log: "89.160.20.128 - - [08/Sep/2014:02:54:42 +0000] \"GET /presentations/logstash-scale11x/images/ahhh___rage_face_by_samusmmx-d5g5zap.png HTTP/1.1\" 200 175208 \"http://mobile.rivals.com/board_posts.asp?SID=880&mid=198829575&fid=2208&tid=198829575&Team=&TeamId=&SiteId=\" \"Mozilla/5.0 (Linux; Android 4.2.2; VS980 4G Build/JDQ39B) AppleWebKit/537.36 (KHTML, like Gecko) Chrome/33.0.1750.135 Mobile Safari/537.36\""
         }
 
   - do:
@@ -118,11 +118,11 @@
   - match: { _source.http.request.referrer: "http://mobile.rivals.com/board_posts.asp?SID=880&mid=198829575&fid=2208&tid=198829575&Team=&TeamId=&SiteId=" }
   - match: { _source.http.response.status_code: 200 }
   - match: { _source.http.response.body.bytes: 175208 }
-  - match: { _source.source.address: "70.193.17.92" }
+  - match: { _source.source.address: "89.160.20.128" }
   - match: { _source.http.version: "1.1" }
   - match: { _source.timestamp: "2014-09-08T02:54:42.000Z" }
-  - match: { _source.geoip.continent_name: "North America" }
-  - match: { _source.geoip.country_iso_code: "US" }
+  - match: { _source.geoip.continent_name: "Europe" }
+  - match: { _source.geoip.country_iso_code: "SE" }
 
 ---
 "Test mutate":

+ 26 - 2
server/src/main/java/org/elasticsearch/ingest/IngestService.java

@@ -56,10 +56,12 @@ import org.elasticsearch.script.ScriptService;
 import org.elasticsearch.threadpool.ThreadPool;
 
 import java.util.ArrayList;
+import java.util.Collection;
 import java.util.Collections;
 import java.util.HashMap;
 import java.util.HashSet;
 import java.util.Iterator;
+import java.util.LinkedList;
 import java.util.List;
 import java.util.Locale;
 import java.util.Map;
@@ -70,6 +72,7 @@ import java.util.concurrent.atomic.AtomicInteger;
 import java.util.function.BiConsumer;
 import java.util.function.Consumer;
 import java.util.function.IntConsumer;
+import java.util.function.Predicate;
 import java.util.stream.Collectors;
 
 /**
@@ -791,7 +794,7 @@ public class IngestService implements ClusterStateApplier, ReportingService<Inge
         }
     }
 
-    void innerUpdatePipelines(IngestMetadata newIngestMetadata) {
+    synchronized void innerUpdatePipelines(IngestMetadata newIngestMetadata) {
         Map<String, PipelineHolder> existingPipelines = this.pipelines;
 
         // Lazy initialize these variables in order to favour the most like scenario that there are no pipeline changes:
@@ -890,7 +893,7 @@ public class IngestService implements ClusterStateApplier, ReportingService<Inge
      * @param clazz the Processor class to look for
      * @return True if the pipeline contains an instance of the Processor class passed in
      */
-    public<P extends Processor> List<P> getProcessorsInPipeline(String pipelineId, Class<P> clazz) {
+    public <P extends Processor> List<P> getProcessorsInPipeline(String pipelineId, Class<P> clazz) {
         Pipeline pipeline = getPipeline(pipelineId);
         if (pipeline == null) {
             throw new IllegalArgumentException("pipeline with id [" + pipelineId + "] does not exist");
@@ -919,6 +922,27 @@ public class IngestService implements ClusterStateApplier, ReportingService<Inge
         return processors;
     }
 
+    public <P extends Processor> Collection<String> getPipelineWithProcessorType(Class<P> clazz, Predicate<P> predicate) {
+        List<String> matchedPipelines = new LinkedList<>();
+        for (PipelineHolder holder : pipelines.values()) {
+            String pipelineId = holder.pipeline.getId();
+            List<P> processors = getProcessorsInPipeline(pipelineId, clazz);
+            if (processors.isEmpty() == false && processors.stream().anyMatch(predicate)) {
+                matchedPipelines.add(pipelineId);
+            }
+        }
+        return matchedPipelines;
+    }
+
+    public synchronized void reloadPipeline(String id) throws Exception {
+        PipelineHolder holder = pipelines.get(id);
+        Pipeline updatedPipeline =
+            Pipeline.create(id, holder.configuration.getConfigAsMap(), processorFactories, scriptService);
+        Map<String, PipelineHolder> updatedPipelines = new HashMap<>(this.pipelines);
+        updatedPipelines.put(id, new PipelineHolder(holder.configuration, updatedPipeline));
+        this.pipelines = Map.copyOf(updatedPipelines);
+    }
+
     private static Pipeline substitutePipeline(String id, ElasticsearchParseException e) {
         String tag = e.getHeaderKeys().contains("processor_tag") ? e.getHeader("processor_tag").get(0) : null;
         String type = e.getHeaderKeys().contains("processor_type") ? e.getHeader("processor_type").get(0) : "unknown";

+ 82 - 3
server/src/test/java/org/elasticsearch/ingest/IngestServiceTests.java

@@ -43,10 +43,7 @@ import org.elasticsearch.common.logging.Loggers;
 import org.elasticsearch.common.settings.Settings;
 import org.elasticsearch.common.time.DateFormatter;
 import org.elasticsearch.common.util.concurrent.EsExecutors;
-import org.elasticsearch.xcontent.XContentBuilder;
 import org.elasticsearch.common.xcontent.XContentHelper;
-import org.elasticsearch.xcontent.XContentType;
-import org.elasticsearch.xcontent.cbor.CborXContent;
 import org.elasticsearch.core.Tuple;
 import org.elasticsearch.index.IndexSettings;
 import org.elasticsearch.index.VersionType;
@@ -60,6 +57,9 @@ import org.elasticsearch.test.ESTestCase;
 import org.elasticsearch.test.MockLogAppender;
 import org.elasticsearch.threadpool.ThreadPool;
 import org.elasticsearch.threadpool.ThreadPool.Names;
+import org.elasticsearch.xcontent.XContentBuilder;
+import org.elasticsearch.xcontent.XContentType;
+import org.elasticsearch.xcontent.cbor.CborXContent;
 import org.hamcrest.CustomTypeSafeMatcher;
 import org.junit.Before;
 import org.mockito.ArgumentMatcher;
@@ -90,6 +90,9 @@ import static java.util.Collections.emptyMap;
 import static java.util.Collections.emptySet;
 import static org.hamcrest.Matchers.containsString;
 import static org.elasticsearch.core.Tuple.tuple;
+import static org.hamcrest.Matchers.containsInAnyOrder;
+import static org.hamcrest.Matchers.containsString;
+import static org.hamcrest.Matchers.emptyIterable;
 import static org.hamcrest.Matchers.equalTo;
 import static org.hamcrest.Matchers.greaterThanOrEqualTo;
 import static org.hamcrest.Matchers.instanceOf;
@@ -394,6 +397,82 @@ public class IngestServiceTests extends ESTestCase {
         assertThat("pipeline with id [fakeID] does not exist", equalTo(e.getMessage()));
     }
 
+    public void testGetPipelineWithProcessorType() throws Exception {
+        IngestService ingestService = createWithProcessors();
+        ClusterState clusterState = ClusterState.builder(new ClusterName("_name")).build();
+        ClusterState previousClusterState = clusterState;
+
+        PutPipelineRequest putRequest1 = new PutPipelineRequest("_id1", new BytesArray(
+            "{\"processors\": [{\"set\" : {\"field\": \"_field\", \"value\": \"_value\", \"tag\": \"tag1\"}}," +
+                "{\"remove\" : {\"field\": \"_field\", \"tag\": \"tag2\"}}]}"),
+            XContentType.JSON);
+        clusterState = IngestService.innerPut(putRequest1, clusterState);
+        PutPipelineRequest putRequest2 = new PutPipelineRequest("_id2", new BytesArray(
+            "{\"processors\": [{\"set\" : {\"field\": \"_field\", \"value\": \"_value\", \"tag\": \"tag2\"}}]}"),
+            XContentType.JSON);
+        clusterState = IngestService.innerPut(putRequest2, clusterState);
+        ingestService.applyClusterState(new ClusterChangedEvent("", clusterState, previousClusterState));
+
+        assertThat(ingestService.getPipelineWithProcessorType(FakeProcessor.class, processor -> true), containsInAnyOrder("_id1", "_id2"));
+        assertThat(ingestService.getPipelineWithProcessorType(FakeProcessor.class, processor -> false), emptyIterable());
+        assertThat(ingestService.getPipelineWithProcessorType(WrappingProcessorImpl.class, processor -> true), containsInAnyOrder("_id1"));
+    }
+
+    public void testReloadPipeline() throws Exception {
+        boolean[] externalProperty = new boolean[] {false};
+
+        Map<String, Processor.Factory> processorFactories = new HashMap<>();
+        processorFactories.put("set", (factories, tag, description, config) -> {
+            String field = (String) config.remove("field");
+            String value = (String) config.remove("value");
+            if (externalProperty[0]) {
+                return new FakeProcessor("set", tag, description, (ingestDocument) ->ingestDocument.setFieldValue(field, value));
+            } else {
+                return new AbstractProcessor(tag, description) {
+                    @Override
+                    public IngestDocument execute(IngestDocument ingestDocument) throws Exception {
+                        throw new RuntimeException("reload me");
+                    }
+
+                    @Override
+                    public String getType() {
+                        return "set";
+                    }
+                };
+            }
+        });
+
+        IngestService ingestService = createWithProcessors(processorFactories);
+        ClusterState clusterState = ClusterState.builder(new ClusterName("_name")).build();
+        ClusterState previousClusterState = clusterState;
+
+        PutPipelineRequest putRequest1 = new PutPipelineRequest("_id1", new BytesArray(
+            "{\"processors\": [{\"set\" : {\"field\": \"_field\", \"value\": \"_value\", \"tag\": \"tag1\"}}]}"),
+            XContentType.JSON);
+        clusterState = IngestService.innerPut(putRequest1, clusterState);
+        ingestService.applyClusterState(new ClusterChangedEvent("", clusterState, previousClusterState));
+
+        {
+            Exception[] exceptionHolder = new Exception[1];
+            IngestDocument ingestDocument = RandomDocumentPicks.randomIngestDocument(random(), new HashMap<>());
+            ingestService.getPipeline("_id1").execute(ingestDocument, (ingestDocument1, e) -> exceptionHolder[0] = e);
+            assertThat(exceptionHolder[0], notNullValue());
+            assertThat(exceptionHolder[0].getMessage(), containsString("reload me"));
+            assertThat(ingestDocument.getSourceAndMetadata().get("_field"), nullValue());
+        }
+
+        externalProperty[0] = true;
+        ingestService.reloadPipeline("_id1");
+
+        {
+            Exception[] holder = new Exception[1];
+            IngestDocument ingestDocument = RandomDocumentPicks.randomIngestDocument(random(), new HashMap<>());
+            ingestService.getPipeline("_id1").execute(ingestDocument, (ingestDocument1, e) -> holder[0] = e);
+            assertThat(holder[0], nullValue());
+            assertThat(ingestDocument.getSourceAndMetadata().get("_field"), equalTo("_value"));
+        }
+    }
+
     public void testGetProcessorsInPipelineComplexConditional() throws Exception {
         LongSupplier relativeTimeProvider = mock(LongSupplier.class);
         String scriptName = "conditionalScript";