Explorar el Código

Change GeoIP downloader policy after 30 days of no updates (#74099)

This PR changes the way GeoIpDownloader and GeoIpProcessor handle situation when we are unable to update databases for 30 days. In that case:

GeoIpDownloader will delete all chunks from .geoip_databases index
DatabaseRegistry will delete all files on ingest nodes
GeoIpProcessor will tag document with tags: ["_geoip_expired_database"] field (same way as in Logstash)
This change also fixes bug with that breaks DatabaseRegistry and when it tires to download databases after updating timestamp only (GeoIpDownloader checks if there are new databases and updates timestamp because local databases are up to date)
Przemko Robakowski hace 4 años
padre
commit
331a44ba42
Se han modificado 15 ficheros con 415 adiciones y 193 borrados
  1. 4 2
      modules/ingest-geoip/src/internalClusterTest/java/org/elasticsearch/ingest/geoip/AbstractGeoIpIT.java
  2. 172 77
      modules/ingest-geoip/src/internalClusterTest/java/org/elasticsearch/ingest/geoip/GeoIpDownloaderIT.java
  3. 10 5
      modules/ingest-geoip/src/internalClusterTest/java/org/elasticsearch/ingest/geoip/ReloadingDatabasesWhilePerformingGeoLookupsIT.java
  4. 0 16
      modules/ingest-geoip/src/main/java/org/elasticsearch/ingest/geoip/DatabaseReaderLazyLoader.java
  5. 25 19
      modules/ingest-geoip/src/main/java/org/elasticsearch/ingest/geoip/DatabaseRegistry.java
  6. 28 4
      modules/ingest-geoip/src/main/java/org/elasticsearch/ingest/geoip/GeoIpDownloader.java
  7. 30 4
      modules/ingest-geoip/src/main/java/org/elasticsearch/ingest/geoip/GeoIpProcessor.java
  8. 34 6
      modules/ingest-geoip/src/main/java/org/elasticsearch/ingest/geoip/GeoIpTaskState.java
  9. 2 2
      modules/ingest-geoip/src/main/java/org/elasticsearch/ingest/geoip/IngestGeoIpPlugin.java
  10. 37 9
      modules/ingest-geoip/src/main/java/org/elasticsearch/ingest/geoip/stats/GeoIpDownloaderStats.java
  11. 12 12
      modules/ingest-geoip/src/test/java/org/elasticsearch/ingest/geoip/DatabaseRegistryTests.java
  12. 2 2
      modules/ingest-geoip/src/test/java/org/elasticsearch/ingest/geoip/GeoIpDownloaderTests.java
  13. 22 16
      modules/ingest-geoip/src/test/java/org/elasticsearch/ingest/geoip/GeoIpProcessorFactoryTests.java
  14. 35 18
      modules/ingest-geoip/src/test/java/org/elasticsearch/ingest/geoip/GeoIpProcessorTests.java
  15. 2 1
      modules/ingest-geoip/src/test/java/org/elasticsearch/ingest/geoip/GeoIpTaskStateSerializationTests.java

+ 4 - 2
modules/ingest-geoip/src/internalClusterTest/java/org/elasticsearch/ingest/geoip/AbstractGeoIpIT.java

@@ -10,6 +10,7 @@ package org.elasticsearch.ingest.geoip;
 
 import org.elasticsearch.common.settings.Setting;
 import org.elasticsearch.common.settings.Settings;
+import org.elasticsearch.core.TimeValue;
 import org.elasticsearch.plugins.Plugin;
 import org.elasticsearch.test.ESIntegTestCase;
 import org.elasticsearch.test.StreamsUtils;
@@ -21,7 +22,6 @@ import java.nio.file.Files;
 import java.nio.file.Path;
 import java.util.Arrays;
 import java.util.Collection;
-import java.util.Collections;
 import java.util.List;
 
 public abstract class AbstractGeoIpIT extends ESIntegTestCase {
@@ -58,7 +58,9 @@ public abstract class AbstractGeoIpIT extends ESIntegTestCase {
 
         @Override
         public List<Setting<?>> getSettings() {
-            return Collections.singletonList(Setting.simpleString("ingest.geoip.database_path", Setting.Property.NodeScope));
+            return List.of(Setting.simpleString("ingest.geoip.database_path", Setting.Property.NodeScope),
+                Setting.timeSetting("ingest.geoip.database_validity", TimeValue.timeValueDays(3), Setting.Property.NodeScope,
+                    Setting.Property.Dynamic));
         }
     }
 }

+ 172 - 77
modules/ingest-geoip/src/internalClusterTest/java/org/elasticsearch/ingest/geoip/GeoIpDownloaderIT.java

@@ -9,18 +9,20 @@
 package org.elasticsearch.ingest.geoip;
 
 import com.maxmind.geoip2.DatabaseReader;
+
 import org.apache.lucene.search.TotalHits;
 import org.elasticsearch.action.admin.cluster.settings.ClusterUpdateSettingsResponse;
 import org.elasticsearch.action.ingest.SimulateDocumentBaseResult;
 import org.elasticsearch.action.ingest.SimulatePipelineRequest;
 import org.elasticsearch.action.ingest.SimulatePipelineResponse;
 import org.elasticsearch.action.search.SearchResponse;
-import org.elasticsearch.core.SuppressForbidden;
 import org.elasticsearch.common.bytes.BytesReference;
 import org.elasticsearch.common.settings.Settings;
 import org.elasticsearch.common.xcontent.XContentBuilder;
 import org.elasticsearch.common.xcontent.XContentType;
 import org.elasticsearch.common.xcontent.json.JsonXContent;
+import org.elasticsearch.core.SuppressForbidden;
+import org.elasticsearch.core.TimeValue;
 import org.elasticsearch.env.Environment;
 import org.elasticsearch.index.query.BoolQueryBuilder;
 import org.elasticsearch.index.query.MatchQueryBuilder;
@@ -53,10 +55,13 @@ import java.util.stream.StreamSupport;
 import java.util.zip.GZIPInputStream;
 
 import static org.elasticsearch.test.hamcrest.ElasticsearchAssertions.assertAcked;
+import static org.hamcrest.Matchers.contains;
 import static org.hamcrest.Matchers.containsInAnyOrder;
 import static org.hamcrest.Matchers.empty;
 import static org.hamcrest.Matchers.equalTo;
+import static org.hamcrest.Matchers.hasItem;
 import static org.hamcrest.Matchers.is;
+import static org.hamcrest.Matchers.not;
 import static org.hamcrest.Matchers.nullValue;
 
 public class GeoIpDownloaderIT extends AbstractGeoIpIT {
@@ -78,14 +83,85 @@ public class GeoIpDownloaderIT extends AbstractGeoIpIT {
     }
 
     @After
-    public void disableDownloader() {
+    public void cleanUp() {
         ClusterUpdateSettingsResponse settingsResponse = client().admin().cluster()
             .prepareUpdateSettings()
-            .setPersistentSettings(Settings.builder().put(GeoIpDownloaderTaskExecutor.ENABLED_SETTING.getKey(), (String) null))
+            .setPersistentSettings(Settings.builder()
+                .put(GeoIpDownloaderTaskExecutor.ENABLED_SETTING.getKey(), (String) null)
+                .put(GeoIpDownloader.POLL_INTERVAL_SETTING.getKey(), (String) null)
+                .put("ingest.geoip.database_validity", (String) null))
             .get();
         assertTrue(settingsResponse.isAcknowledged());
     }
 
+    public void testInvalidTimestamp() throws Exception {
+        assumeTrue("only test with fixture to have stable results", ENDPOINT != null);
+        ClusterUpdateSettingsResponse settingsResponse =
+            client().admin().cluster()
+                .prepareUpdateSettings()
+                .setPersistentSettings(Settings.builder()
+                    .put(GeoIpDownloaderTaskExecutor.ENABLED_SETTING.getKey(), true))
+                .get();
+        assertTrue(settingsResponse.isAcknowledged());
+        assertBusy(() -> {
+            GeoIpTaskState state = getGeoIpTaskState();
+            assertEquals(Set.of("GeoLite2-ASN.mmdb", "GeoLite2-City.mmdb", "GeoLite2-Country.mmdb"), state.getDatabases().keySet());
+        }, 2, TimeUnit.MINUTES);
+
+        putPipeline();
+        verifyUpdatedDatabase();
+
+        settingsResponse =
+            client().admin().cluster()
+                .prepareUpdateSettings()
+                .setPersistentSettings(Settings.builder()
+                    .put("ingest.geoip.database_validity", TimeValue.timeValueMillis(1)))
+                .get();
+        assertTrue(settingsResponse.isAcknowledged());
+        Thread.sleep(10);
+        settingsResponse = client().admin().cluster()
+            .prepareUpdateSettings()
+            .setPersistentSettings(Settings.builder().put(GeoIpDownloader.POLL_INTERVAL_SETTING.getKey(), TimeValue.timeValueDays(2)))
+            .get();
+        assertTrue(settingsResponse.isAcknowledged());
+        List<Path> geoIpTmpDirs = getGeoIpTmpDirs();
+        assertBusy(() -> {
+            for (Path geoIpTmpDir : geoIpTmpDirs) {
+                try (Stream<Path> files = Files.list(geoIpTmpDir)) {
+                    Set<String> names = files.map(f -> f.getFileName().toString()).collect(Collectors.toSet());
+                    assertThat(names, not(hasItem("GeoLite2-ASN.mmdb")));
+                    assertThat(names, not(hasItem("GeoLite2-City.mmdb")));
+                    assertThat(names, not(hasItem("GeoLite2-Country.mmdb")));
+                }
+            }
+        });
+        putPipeline();
+        assertBusy(() -> {
+            SimulateDocumentBaseResult result = simulatePipeline();
+            assertThat(result.getFailure(), nullValue());
+            assertTrue(result.getIngestDocument().hasField("tags"));
+            @SuppressWarnings("unchecked")
+            List<String> tags = result.getIngestDocument().getFieldValue("tags", List.class);
+            assertThat(tags, contains("_geoip_expired_database"));
+            assertFalse(result.getIngestDocument().hasField("ip-city"));
+            assertFalse(result.getIngestDocument().hasField("ip-asn"));
+            assertFalse(result.getIngestDocument().hasField("ip-country"));
+        });
+    }
+
+    public void testUpdatedTimestamp() throws Exception {
+        assumeTrue("only test with fixture to have stable results", ENDPOINT != null);
+        testGeoIpDatabasesDownload();
+        long lastCheck = getGeoIpTaskState().getDatabases().get("GeoLite2-ASN.mmdb").getLastCheck();
+        ClusterUpdateSettingsResponse settingsResponse = client().admin().cluster()
+            .prepareUpdateSettings()
+            .setPersistentSettings(Settings.builder().put(GeoIpDownloader.POLL_INTERVAL_SETTING.getKey(), TimeValue.timeValueDays(2)))
+            .get();
+        assertTrue(settingsResponse.isAcknowledged());
+        assertBusy(() -> assertNotEquals(lastCheck, getGeoIpTaskState().getDatabases().get("GeoLite2-ASN.mmdb").getLastCheck()));
+        testGeoIpDatabasesDownload();
+    }
+
     public void testGeoIpDatabasesDownload() throws Exception {
         ClusterUpdateSettingsResponse settingsResponse = client().admin().cluster()
             .prepareUpdateSettings()
@@ -93,10 +169,7 @@ public class GeoIpDownloaderIT extends AbstractGeoIpIT {
             .get();
         assertTrue(settingsResponse.isAcknowledged());
         assertBusy(() -> {
-            PersistentTasksCustomMetadata.PersistentTask<PersistentTaskParams> task = getTask();
-            assertNotNull(task);
-            GeoIpTaskState state = (GeoIpTaskState) task.getState();
-            assertNotNull(state);
+            GeoIpTaskState state = getGeoIpTaskState();
             assertEquals(Set.of("GeoLite2-ASN.mmdb", "GeoLite2-City.mmdb", "GeoLite2-Country.mmdb"), state.getDatabases().keySet());
         }, 2, TimeUnit.MINUTES);
 
@@ -150,6 +223,95 @@ public class GeoIpDownloaderIT extends AbstractGeoIpIT {
     public void testUseGeoIpProcessorWithDownloadedDBs() throws Exception {
         assumeTrue("only test with fixture to have stable results", ENDPOINT != null);
         // setup:
+        putPipeline();
+
+        // verify before updating dbs
+        {
+            SimulateDocumentBaseResult result = simulatePipeline();
+            assertThat(result.getIngestDocument().getFieldValue("ip-city.city_name", String.class), equalTo("Tumba"));
+            assertThat(result.getIngestDocument().getFieldValue("ip-asn.organization_name", String.class), equalTo("Bredband2 AB"));
+            assertThat(result.getIngestDocument().getFieldValue("ip-country.country_name", String.class), equalTo("Sweden"));
+        }
+
+        // Enable downloader:
+        Settings.Builder settings = Settings.builder().put(GeoIpDownloaderTaskExecutor.ENABLED_SETTING.getKey(), true);
+        assertAcked(client().admin().cluster().prepareUpdateSettings().setPersistentSettings(settings));
+
+        final List<Path> geoipTmpDirs = getGeoIpTmpDirs();
+        assertBusy(() -> {
+            for (Path geoipTmpDir : geoipTmpDirs) {
+                try (Stream<Path> list = Files.list(geoipTmpDir)) {
+                    List<String> files = list.map(Path::getFileName).map(Path::toString).collect(Collectors.toList());
+                    assertThat(files, containsInAnyOrder("GeoLite2-City.mmdb", "GeoLite2-Country.mmdb", "GeoLite2-ASN.mmdb",
+                        "GeoLite2-City.mmdb_COPYRIGHT.txt", "GeoLite2-Country.mmdb_COPYRIGHT.txt", "GeoLite2-ASN.mmdb_COPYRIGHT.txt",
+                        "GeoLite2-City.mmdb_LICENSE.txt", "GeoLite2-Country.mmdb_LICENSE.txt", "GeoLite2-ASN.mmdb_LICENSE.txt",
+                        "GeoLite2-ASN.mmdb_README.txt"));
+                }
+            }
+        });
+
+        verifyUpdatedDatabase();
+
+        // Disable downloader:
+        settings = Settings.builder().put(GeoIpDownloaderTaskExecutor.ENABLED_SETTING.getKey(), false);
+        assertAcked(client().admin().cluster().prepareUpdateSettings().setPersistentSettings(settings));
+
+        assertBusy(() -> {
+            for (Path geoipTmpDir : geoipTmpDirs) {
+                try (Stream<Path> list = Files.list(geoipTmpDir)) {
+                    List<String> files = list.map(Path::toString).filter(p -> p.endsWith(".mmdb")).collect(Collectors.toList());
+                    assertThat(files, empty());
+                }
+            }
+        });
+    }
+
+    private void verifyUpdatedDatabase() throws Exception {
+        assertBusy(() -> {
+            SimulateDocumentBaseResult result = simulatePipeline();
+            assertThat(result.getFailure(), nullValue());
+            assertThat(result.getIngestDocument().getFieldValue("ip-city.city_name", String.class), equalTo("Linköping"));
+            assertThat(result.getIngestDocument().getFieldValue("ip-asn.organization_name", String.class), equalTo("Bredband2 AB"));
+            assertThat(result.getIngestDocument().getFieldValue("ip-country.country_name", String.class), equalTo("Sweden"));
+        });
+    }
+
+    private GeoIpTaskState getGeoIpTaskState() {
+        PersistentTasksCustomMetadata.PersistentTask<PersistentTaskParams> task = getTask();
+        assertNotNull(task);
+        GeoIpTaskState state = (GeoIpTaskState) task.getState();
+        assertNotNull(state);
+        return state;
+    }
+
+    private SimulateDocumentBaseResult simulatePipeline() throws IOException {
+        BytesReference bytes;
+        try (XContentBuilder builder = JsonXContent.contentBuilder()) {
+            builder.startObject();
+            builder.startArray("docs");
+            {
+                builder.startObject();
+                builder.field("_index", "my-index");
+                {
+                    builder.startObject("_source");
+                    builder.field("ip", "89.160.20.128");
+                    builder.endObject();
+                }
+                builder.endObject();
+            }
+            builder.endArray();
+            builder.endObject();
+            bytes = BytesReference.bytes(builder);
+        }
+        SimulatePipelineRequest simulateRequest = new SimulatePipelineRequest(bytes, XContentType.JSON);
+        simulateRequest.setId("_id");
+        SimulatePipelineResponse simulateResponse = client().admin().cluster().simulatePipeline(simulateRequest).actionGet();
+        assertThat(simulateResponse.getPipelineId(), equalTo("_id"));
+        assertThat(simulateResponse.getResults().size(), equalTo(1));
+        return (SimulateDocumentBaseResult) simulateResponse.getResults().get(0);
+    }
+
+    private void putPipeline() throws IOException {
         BytesReference bytes;
         try (XContentBuilder builder = JsonXContent.contentBuilder()) {
             builder.startObject();
@@ -196,41 +358,9 @@ public class GeoIpDownloaderIT extends AbstractGeoIpIT {
             bytes = BytesReference.bytes(builder);
         }
         assertAcked(client().admin().cluster().preparePutPipeline("_id", bytes, XContentType.JSON).get());
+    }
 
-        // verify before updating dbs
-        try (XContentBuilder builder = JsonXContent.contentBuilder()) {
-            builder.startObject();
-            builder.startArray("docs");
-            {
-                builder.startObject();
-                builder.field("_index", "my-index");
-                {
-                    builder.startObject("_source");
-                    builder.field("ip", "89.160.20.128");
-                    builder.endObject();
-                }
-                builder.endObject();
-            }
-            builder.endArray();
-            builder.endObject();
-            bytes = BytesReference.bytes(builder);
-        }
-        SimulatePipelineRequest simulateRequest = new SimulatePipelineRequest(bytes, XContentType.JSON);
-        simulateRequest.setId("_id");
-        {
-            SimulatePipelineResponse simulateResponse = client().admin().cluster().simulatePipeline(simulateRequest).actionGet();
-            assertThat(simulateResponse.getPipelineId(), equalTo("_id"));
-            assertThat(simulateResponse.getResults().size(), equalTo(1));
-            SimulateDocumentBaseResult result = (SimulateDocumentBaseResult) simulateResponse.getResults().get(0);
-            assertThat(result.getIngestDocument().getFieldValue("ip-city.city_name", String.class), equalTo("Tumba"));
-            assertThat(result.getIngestDocument().getFieldValue("ip-asn.organization_name", String.class), equalTo("Bredband2 AB"));
-            assertThat(result.getIngestDocument().getFieldValue("ip-country.country_name", String.class), equalTo("Sweden"));
-        }
-
-        // Enable downloader:
-        Settings.Builder settings = Settings.builder().put(GeoIpDownloaderTaskExecutor.ENABLED_SETTING.getKey(), true);
-        assertAcked(client().admin().cluster().prepareUpdateSettings().setPersistentSettings(settings));
-
+    private List<Path> getGeoIpTmpDirs() throws IOException {
         final Set<String> ids = StreamSupport.stream(clusterService().state().nodes().getDataNodes().values().spliterator(), false)
             .map(c -> c.value.getId())
             .collect(Collectors.toSet());
@@ -242,42 +372,7 @@ public class GeoIpDownloaderIT extends AbstractGeoIpIT {
             geoipTmpDirs = files.filter(path -> ids.contains(path.getFileName().toString())).collect(Collectors.toList());
         }
         assertThat(geoipTmpDirs.size(), equalTo(internalCluster().numDataNodes()));
-        assertBusy(() -> {
-            for (Path geoipTmpDir : geoipTmpDirs) {
-                try (Stream<Path> list = Files.list(geoipTmpDir)) {
-                    List<String> files = list.map(Path::getFileName).map(Path::toString).collect(Collectors.toList());
-                    assertThat(files, containsInAnyOrder("GeoLite2-City.mmdb", "GeoLite2-Country.mmdb", "GeoLite2-ASN.mmdb",
-                        "GeoLite2-City.mmdb_COPYRIGHT.txt", "GeoLite2-Country.mmdb_COPYRIGHT.txt", "GeoLite2-ASN.mmdb_COPYRIGHT.txt",
-                        "GeoLite2-City.mmdb_LICENSE.txt", "GeoLite2-Country.mmdb_LICENSE.txt", "GeoLite2-ASN.mmdb_LICENSE.txt",
-                        "GeoLite2-ASN.mmdb_README.txt"));
-                }
-            }
-        });
-
-        // Verify after updating dbs:
-        assertBusy(() -> {
-            SimulatePipelineResponse simulateResponse = client().admin().cluster().simulatePipeline(simulateRequest).actionGet();
-            assertThat(simulateResponse.getPipelineId(), equalTo("_id"));
-            assertThat(simulateResponse.getResults().size(), equalTo(1));
-            SimulateDocumentBaseResult result = (SimulateDocumentBaseResult) simulateResponse.getResults().get(0);
-            assertThat(result.getFailure(), nullValue());
-            assertThat(result.getIngestDocument().getFieldValue("ip-city.city_name", String.class), equalTo("Linköping"));
-            assertThat(result.getIngestDocument().getFieldValue("ip-asn.organization_name", String.class), equalTo("Bredband2 AB"));
-            assertThat(result.getIngestDocument().getFieldValue("ip-country.country_name", String.class), equalTo("Sweden"));
-        });
-
-        // Disable downloader:
-        settings = Settings.builder().put(GeoIpDownloaderTaskExecutor.ENABLED_SETTING.getKey(), false);
-        assertAcked(client().admin().cluster().prepareUpdateSettings().setPersistentSettings(settings));
-
-        assertBusy(() -> {
-            for (Path geoipTmpDir : geoipTmpDirs) {
-                try (Stream<Path> list = Files.list(geoipTmpDir)) {
-                    List<String> files = list.map(Path::toString).filter(p -> p.endsWith(".mmdb")).collect(Collectors.toList());
-                    assertThat(files, empty());
-                }
-            }
-        });
+        return geoipTmpDirs;
     }
 
     @SuppressForbidden(reason = "Maxmind API requires java.io.File")

+ 10 - 5
modules/ingest-geoip/src/internalClusterTest/java/org/elasticsearch/ingest/geoip/ReloadingDatabasesWhilePerformingGeoLookupsIT.java

@@ -10,6 +10,8 @@ package org.elasticsearch.ingest.geoip;
 
 import org.apache.lucene.util.LuceneTestCase;
 import org.elasticsearch.client.Client;
+import org.elasticsearch.cluster.ClusterState;
+import org.elasticsearch.cluster.service.ClusterService;
 import org.elasticsearch.common.network.InetAddresses;
 import org.elasticsearch.common.util.concurrent.AtomicArray;
 import org.elasticsearch.core.internal.io.IOUtils;
@@ -41,6 +43,7 @@ import static org.hamcrest.Matchers.notNullValue;
 import static org.hamcrest.Matchers.nullValue;
 import static org.hamcrest.Matchers.sameInstance;
 import static org.mockito.Mockito.mock;
+import static org.mockito.Mockito.when;
 
 @LuceneTestCase.SuppressFileSystems(value = "ExtrasFS") // Don't randomly add 'extra' files to directory.
 public class ReloadingDatabasesWhilePerformingGeoLookupsIT extends ESTestCase {
@@ -57,13 +60,15 @@ public class ReloadingDatabasesWhilePerformingGeoLookupsIT extends ESTestCase {
         Path geoIpConfigDir = createTempDir();
         Path geoIpTmpDir = createTempDir();
         DatabaseRegistry databaseRegistry = createRegistry(geoIpModulesDir, geoIpConfigDir, geoIpTmpDir);
-        GeoIpProcessor.Factory factory = new GeoIpProcessor.Factory(databaseRegistry);
+        ClusterService clusterService = mock(ClusterService.class);
+        when(clusterService.state()).thenReturn(ClusterState.EMPTY_STATE);
+        GeoIpProcessor.Factory factory = new GeoIpProcessor.Factory(databaseRegistry, clusterService);
         Files.copy(LocalDatabases.class.getResourceAsStream("/GeoLite2-City-Test.mmdb"),
             geoIpTmpDir.resolve("GeoLite2-City.mmdb"));
         Files.copy(LocalDatabases.class.getResourceAsStream("/GeoLite2-City-Test.mmdb"),
             geoIpTmpDir.resolve("GeoLite2-City-Test.mmdb"));
-        databaseRegistry.updateDatabase("GeoLite2-City.mmdb", "md5", geoIpTmpDir.resolve("GeoLite2-City.mmdb"), 0);
-        databaseRegistry.updateDatabase("GeoLite2-City-Test.mmdb", "md5", geoIpTmpDir.resolve("GeoLite2-City-Test.mmdb"), 0);
+        databaseRegistry.updateDatabase("GeoLite2-City.mmdb", "md5", geoIpTmpDir.resolve("GeoLite2-City.mmdb"));
+        databaseRegistry.updateDatabase("GeoLite2-City-Test.mmdb", "md5", geoIpTmpDir.resolve("GeoLite2-City-Test.mmdb"));
         lazyLoadReaders(databaseRegistry);
 
         final GeoIpProcessor processor1 = factory.create(null, "_tag", null, new HashMap<>(Map.of("field", "_field")));
@@ -116,13 +121,13 @@ public class ReloadingDatabasesWhilePerformingGeoLookupsIT extends ESTestCase {
                     } else {
                         Files.copy(LocalDatabases.class.getResourceAsStream("/GeoLite2-City-Test.mmdb"),
                             geoIpTmpDir.resolve("GeoLite2-City.mmdb"), StandardCopyOption.REPLACE_EXISTING);
-                        databaseRegistry.updateDatabase("GeoLite2-City.mmdb", "md5", geoIpTmpDir.resolve("GeoLite2-City.mmdb"), 0);
+                        databaseRegistry.updateDatabase("GeoLite2-City.mmdb", "md5", geoIpTmpDir.resolve("GeoLite2-City.mmdb"));
                     }
                     DatabaseReaderLazyLoader previous2 = databaseRegistry.get("GeoLite2-City-Test.mmdb");
                     InputStream source = LocalDatabases.class.getResourceAsStream(i % 2 == 0 ? "/GeoIP2-City-Test.mmdb" :
                         "/GeoLite2-City-Test.mmdb");
                     Files.copy(source, geoIpTmpDir.resolve("GeoLite2-City-Test.mmdb"), StandardCopyOption.REPLACE_EXISTING);
-                    databaseRegistry.updateDatabase("GeoLite2-City-Test.mmdb", "md5", geoIpTmpDir.resolve("GeoLite2-City-Test.mmdb"), 0);
+                    databaseRegistry.updateDatabase("GeoLite2-City-Test.mmdb", "md5", geoIpTmpDir.resolve("GeoLite2-City-Test.mmdb"));
 
                     DatabaseReaderLazyLoader current1 = databaseRegistry.get("GeoLite2-City.mmdb");
                     DatabaseReaderLazyLoader current2 = databaseRegistry.get("GeoLite2-City-Test.mmdb");

+ 0 - 16
modules/ingest-geoip/src/main/java/org/elasticsearch/ingest/geoip/DatabaseReaderLazyLoader.java

@@ -24,7 +24,6 @@ import org.elasticsearch.core.Booleans;
 import org.elasticsearch.common.CheckedBiFunction;
 import org.elasticsearch.common.CheckedSupplier;
 import org.elasticsearch.core.SuppressForbidden;
-import org.elasticsearch.common.logging.HeaderWarning;
 import org.elasticsearch.core.internal.io.IOUtils;
 
 import java.io.Closeable;
@@ -36,7 +35,6 @@ import java.nio.file.Files;
 import java.nio.file.Path;
 import java.security.AccessController;
 import java.security.PrivilegedAction;
-import java.time.Duration;
 import java.util.Objects;
 import java.util.concurrent.atomic.AtomicInteger;
 
@@ -55,7 +53,6 @@ class DatabaseReaderLazyLoader implements Closeable {
     private final GeoIpCache cache;
     private final Path databasePath;
     private final CheckedSupplier<DatabaseReader, IOException> loader;
-    private volatile long lastUpdate;
     final SetOnce<DatabaseReader> databaseReader;
 
     // cache the database type so that we do not re-read it on every pipeline execution
@@ -197,16 +194,6 @@ class DatabaseReaderLazyLoader implements Closeable {
     }
 
     DatabaseReader get() throws IOException {
-        //only downloaded databases will have lastUpdate != 0, we never update it for default databases or databases from config dir
-        if (lastUpdate != 0) {
-            Path fileName = databasePath.getFileName();
-            if (System.currentTimeMillis() - lastUpdate > Duration.ofDays(30).toMillis()) {
-                throw new IllegalStateException("database [" + fileName + "] was not updated for 30 days and is disabled");
-            } else if (System.currentTimeMillis() - lastUpdate > Duration.ofDays(25).toMillis()) {
-                HeaderWarning.addWarning(
-                    "database [{}] was not updated for over 25 days, ingestion will fail if there is no update for 30 days", fileName);
-            }
-        }
         if (databaseReader.get() == null) {
             synchronized (databaseReader) {
                 if (databaseReader.get() == null) {
@@ -261,7 +248,4 @@ class DatabaseReaderLazyLoader implements Closeable {
         return new DatabaseReader.Builder(databasePath.toFile());
     }
 
-    void setLastUpdate(long lastUpdate) {
-        this.lastUpdate = lastUpdate;
-    }
 }

+ 25 - 19
modules/ingest-geoip/src/main/java/org/elasticsearch/ingest/geoip/DatabaseRegistry.java

@@ -48,6 +48,7 @@ import java.util.ArrayList;
 import java.util.Collection;
 import java.util.List;
 import java.util.Locale;
+import java.util.Map;
 import java.util.Objects;
 import java.util.Set;
 import java.util.concurrent.ConcurrentHashMap;
@@ -202,25 +203,31 @@ public final class DatabaseRegistry implements Closeable {
         // Empty state will purge stale entries in databases map.
         GeoIpTaskState taskState = task == null || task.getState() == null ? GeoIpTaskState.EMPTY : (GeoIpTaskState) task.getState();
 
-        taskState.getDatabases().forEach((name, metadata) -> {
-            DatabaseReaderLazyLoader reference = databases.get(name);
-            String remoteMd5 = metadata.getMd5();
-            String localMd5 = reference != null ? reference.getMd5() : null;
-            if (Objects.equals(localMd5, remoteMd5)) {
-                reference.setLastUpdate(metadata.getLastUpdate());
-                LOGGER.debug("Current reference of [{}] is up to date [{}] with was recorded in CS [{}]", name, localMd5, remoteMd5);
-                return;
-            }
+        taskState.getDatabases().entrySet().stream()
+            .filter(e -> e.getValue().isValid(state.getMetadata().settings()))
+            .forEach(e -> {
+                String name = e.getKey();
+                GeoIpTaskState.Metadata metadata = e.getValue();
+                DatabaseReaderLazyLoader reference = databases.get(name);
+                String remoteMd5 = metadata.getMd5();
+                String localMd5 = reference != null ? reference.getMd5() : null;
+                if (Objects.equals(localMd5, remoteMd5)) {
+                    LOGGER.debug("Current reference of [{}] is up to date [{}] with was recorded in CS [{}]", name, localMd5, remoteMd5);
+                    return;
+                }
 
-            try {
-                retrieveAndUpdateDatabase(name, metadata);
-            } catch (Exception e) {
-                LOGGER.error((Supplier<?>) () -> new ParameterizedMessage("attempt to download database [{}] failed", name), e);
-            }
-        });
+                try {
+                    retrieveAndUpdateDatabase(name, metadata);
+                } catch (Exception ex) {
+                    LOGGER.error((Supplier<?>) () -> new ParameterizedMessage("attempt to download database [{}] failed", name), ex);
+                }
+            });
 
         List<String> staleEntries = new ArrayList<>(databases.keySet());
-        staleEntries.removeAll(taskState.getDatabases().keySet());
+        staleEntries.removeAll(taskState.getDatabases().entrySet().stream()
+            .filter(e->e.getValue().isValid(state.getMetadata().settings()))
+            .map(Map.Entry::getKey)
+            .collect(Collectors.toSet()));
         removeStaleEntries(staleEntries);
     }
 
@@ -284,7 +291,7 @@ public final class DatabaseRegistry implements Closeable {
 
                 LOGGER.debug("moving database from [{}] to [{}]", databaseTmpFile, databaseFile);
                 Files.move(databaseTmpFile, databaseFile, StandardCopyOption.ATOMIC_MOVE, StandardCopyOption.REPLACE_EXISTING);
-                updateDatabase(databaseName, recordedMd5, databaseFile, metadata.getLastUpdate());
+                updateDatabase(databaseName, recordedMd5, databaseFile);
                 Files.delete(databaseTmpGzFile);
             },
             failure -> {
@@ -299,11 +306,10 @@ public final class DatabaseRegistry implements Closeable {
             });
     }
 
-    void updateDatabase(String databaseFileName, String recordedMd5, Path file, long lastUpdate) {
+    void updateDatabase(String databaseFileName, String recordedMd5, Path file) {
         try {
             LOGGER.info("database file changed [{}], reload database...", file);
             DatabaseReaderLazyLoader loader = new DatabaseReaderLazyLoader(cache, file, recordedMd5);
-            loader.setLastUpdate(lastUpdate);
             DatabaseReaderLazyLoader existing = databases.put(databaseFileName, loader);
             if (existing != null) {
                 existing.close();

+ 28 - 4
modules/ingest-geoip/src/main/java/org/elasticsearch/ingest/geoip/GeoIpDownloader.java

@@ -22,11 +22,11 @@ import org.elasticsearch.common.hash.MessageDigests;
 import org.elasticsearch.common.settings.Setting;
 import org.elasticsearch.common.settings.Setting.Property;
 import org.elasticsearch.common.settings.Settings;
-import org.elasticsearch.core.TimeValue;
 import org.elasticsearch.common.xcontent.DeprecationHandler;
 import org.elasticsearch.common.xcontent.NamedXContentRegistry;
 import org.elasticsearch.common.xcontent.XContentParser;
 import org.elasticsearch.common.xcontent.XContentType;
+import org.elasticsearch.core.TimeValue;
 import org.elasticsearch.index.query.BoolQueryBuilder;
 import org.elasticsearch.index.query.MatchQueryBuilder;
 import org.elasticsearch.index.query.RangeQueryBuilder;
@@ -70,6 +70,7 @@ public class GeoIpDownloader extends AllocatedPersistentTask {
 
     private final Client client;
     private final HttpClient httpClient;
+    private final ClusterService clusterService;
     private final ThreadPool threadPool;
     private final String endpoint;
 
@@ -84,6 +85,7 @@ public class GeoIpDownloader extends AllocatedPersistentTask {
         super(id, type, action, description, parentTask, headers);
         this.httpClient = httpClient;
         this.client = new OriginSettingClient(client, IngestService.INGEST_ORIGIN);
+        this.clusterService = clusterService;
         this.threadPool = threadPool;
         endpoint = ENDPOINT_SETTING.get(settings);
         pollInterval = POLL_INTERVAL_SETTING.get(settings);
@@ -139,7 +141,7 @@ public class GeoIpDownloader extends AllocatedPersistentTask {
             int firstChunk = state.contains(name) ? state.get(name).getLastChunk() + 1 : 0;
             int lastChunk = indexChunks(name, is, firstChunk, md5, start);
             if (lastChunk > firstChunk) {
-                state = state.put(name, new Metadata(start, firstChunk, lastChunk - 1, md5));
+                state = state.put(name, new Metadata(start, firstChunk, lastChunk - 1, md5, start));
                 updateTaskState();
                 stats = stats.successfulDownload(System.currentTimeMillis() - start).count(state.getDatabases().size());
                 logger.info("updated geoip database [" + name + "]");
@@ -166,7 +168,8 @@ public class GeoIpDownloader extends AllocatedPersistentTask {
     //visible for testing
     protected void updateTimestamp(String name, Metadata old) {
         logger.info("geoip database [" + name + "] is up to date, updated timestamp");
-        state = state.put(name, new Metadata(System.currentTimeMillis(), old.getFirstChunk(), old.getLastChunk(), old.getMd5()));
+        state = state.put(name, new Metadata(old.getLastUpdate(), old.getFirstChunk(), old.getLastChunk(), old.getMd5(),
+            System.currentTimeMillis()));
         stats = stats.skippedDownload();
         updateTaskState();
     }
@@ -235,9 +238,28 @@ public class GeoIpDownloader extends AllocatedPersistentTask {
         } catch (Exception e) {
             logger.error("exception during geoip databases update", e);
         }
+        try {
+            cleanDatabases();
+        } catch (Exception e) {
+            logger.error("exception during geoip databases cleanup", e);
+        }
         scheduleNextRun(pollInterval);
     }
 
+    private void cleanDatabases() {
+        long expiredDatabases = state.getDatabases().entrySet().stream()
+            .filter(e -> e.getValue().isValid(clusterService.state().metadata().settings()) == false)
+            .peek(e -> {
+                String name = e.getKey();
+                Metadata meta = e.getValue();
+                deleteOldChunks(name, meta.getLastChunk() + 1);
+                state = state.put(name, new Metadata(meta.getLastUpdate(), meta.getFirstChunk(), meta.getLastChunk(), meta.getMd5(),
+                    meta.getLastCheck() - 1));
+                updateTaskState();
+            }).count();
+        stats = stats.expiredDatabases((int) expiredDatabases);
+    }
+
     @Override
     protected void onCancelled() {
         if (scheduled != null) {
@@ -251,6 +273,8 @@ public class GeoIpDownloader extends AllocatedPersistentTask {
     }
 
     private void scheduleNextRun(TimeValue time) {
-        scheduled = threadPool.schedule(this::runDownloader, time, ThreadPool.Names.GENERIC);
+        if (threadPool.scheduler().isShutdown() == false) {
+            scheduled = threadPool.schedule(this::runDownloader, time, ThreadPool.Names.GENERIC);
+        }
     }
 }

+ 30 - 4
modules/ingest-geoip/src/main/java/org/elasticsearch/ingest/geoip/GeoIpProcessor.java

@@ -19,12 +19,15 @@ import com.maxmind.geoip2.record.Location;
 import com.maxmind.geoip2.record.Subdivision;
 import org.elasticsearch.ElasticsearchParseException;
 import org.elasticsearch.ResourceNotFoundException;
+import org.elasticsearch.cluster.ClusterState;
+import org.elasticsearch.cluster.service.ClusterService;
 import org.elasticsearch.common.CheckedSupplier;
 import org.elasticsearch.common.network.InetAddresses;
 import org.elasticsearch.common.network.NetworkAddress;
 import org.elasticsearch.ingest.AbstractProcessor;
 import org.elasticsearch.ingest.IngestDocument;
 import org.elasticsearch.ingest.Processor;
+import org.elasticsearch.persistent.PersistentTasksCustomMetadata.PersistentTask;
 
 import java.io.IOException;
 import java.net.InetAddress;
@@ -37,11 +40,13 @@ import java.util.List;
 import java.util.Locale;
 import java.util.Map;
 import java.util.Set;
+import java.util.function.Supplier;
 
 import static org.elasticsearch.ingest.ConfigurationUtils.newConfigurationException;
 import static org.elasticsearch.ingest.ConfigurationUtils.readBooleanProperty;
 import static org.elasticsearch.ingest.ConfigurationUtils.readOptionalList;
 import static org.elasticsearch.ingest.ConfigurationUtils.readStringProperty;
+import static org.elasticsearch.persistent.PersistentTasksCustomMetadata.getTaskWithId;
 
 public final class GeoIpProcessor extends AbstractProcessor {
 
@@ -51,6 +56,7 @@ public final class GeoIpProcessor extends AbstractProcessor {
     private static final String ASN_DB_SUFFIX = "-ASN";
 
     private final String field;
+    private final Supplier<Boolean> isValid;
     private final String targetField;
     private final CheckedSupplier<DatabaseReaderLazyLoader, IOException> supplier;
     private final Set<Property> properties;
@@ -59,10 +65,11 @@ public final class GeoIpProcessor extends AbstractProcessor {
 
     /**
      * Construct a geo-IP processor.
-     *  @param tag           the processor tag
+     * @param tag           the processor tag
      * @param description   the processor description
      * @param field         the source field to geo-IP map
      * @param supplier    a supplier of a geo-IP database reader; ideally this is lazily-loaded once on first use
+     * @param isValid
      * @param targetField   the target field
      * @param properties    the properties; ideally this is lazily-loaded once on first use
      * @param ignoreMissing true if documents with a missing value for the field should be ignored
@@ -73,12 +80,14 @@ public final class GeoIpProcessor extends AbstractProcessor {
         final String description,
         final String field,
         final CheckedSupplier<DatabaseReaderLazyLoader, IOException> supplier,
+        final Supplier<Boolean> isValid,
         final String targetField,
         final Set<Property> properties,
         final boolean ignoreMissing,
         final boolean firstOnly) {
         super(tag, description);
         this.field = field;
+        this.isValid = isValid;
         this.targetField = targetField;
         this.supplier = supplier;
         this.properties = properties;
@@ -94,7 +103,10 @@ public final class GeoIpProcessor extends AbstractProcessor {
     public IngestDocument execute(IngestDocument ingestDocument) throws IOException {
         Object ip = ingestDocument.getFieldValue(field, Object.class, ignoreMissing);
 
-        if (ip == null && ignoreMissing) {
+        if (isValid.get() == false) {
+            ingestDocument.appendFieldValue("tags","_geoip_expired_database", false);
+            return ingestDocument;
+        } else if (ip == null && ignoreMissing) {
             return ingestDocument;
         } else if (ip == null) {
             throw new IllegalArgumentException("field [" + field + "] is null, cannot extract geoip information.");
@@ -342,13 +354,15 @@ public final class GeoIpProcessor extends AbstractProcessor {
         ));
 
         private final DatabaseRegistry databaseRegistry;
+        private final ClusterService clusterService;
 
         List<DatabaseReaderLazyLoader> getAllDatabases() {
             return databaseRegistry.getAllDatabases();
         }
 
-        public Factory(DatabaseRegistry databaseRegistry) {
+        public Factory(DatabaseRegistry databaseRegistry, ClusterService clusterService) {
             this.databaseRegistry = databaseRegistry;
+            this.clusterService = clusterService;
         }
 
         @Override
@@ -413,7 +427,19 @@ public final class GeoIpProcessor extends AbstractProcessor {
                     "] doesn't match with expected suffix [" + expectedSuffix + "]";
                 return loader;
             };
-            return new GeoIpProcessor(processorTag, description, ipField, supplier, targetField, properties, ignoreMissing, firstOnly);
+            Supplier<Boolean> isValid = () -> {
+                ClusterState currentState = clusterService.state();
+                assert currentState != null;
+
+                PersistentTask<?> task = getTaskWithId(currentState, GeoIpDownloader.GEOIP_DOWNLOADER);
+                if (task == null || task.getState() == null) {
+                    return true;
+                }
+                GeoIpTaskState state = (GeoIpTaskState) task.getState();
+                return state.getDatabases().get(databaseFile).isValid(currentState.metadata().settings());
+            };
+            return new GeoIpProcessor(processorTag, description, ipField, supplier, isValid, targetField, properties, ignoreMissing,
+                firstOnly);
         }
     }
 

+ 34 - 6
modules/ingest-geoip/src/main/java/org/elasticsearch/ingest/geoip/GeoIpTaskState.java

@@ -9,18 +9,22 @@
 package org.elasticsearch.ingest.geoip;
 
 import org.elasticsearch.Version;
-import org.elasticsearch.common.xcontent.ParseField;
-import org.elasticsearch.core.Tuple;
 import org.elasticsearch.common.io.stream.StreamInput;
 import org.elasticsearch.common.io.stream.StreamOutput;
 import org.elasticsearch.common.io.stream.VersionedNamedWriteable;
+import org.elasticsearch.common.settings.Settings;
 import org.elasticsearch.common.xcontent.ConstructingObjectParser;
+import org.elasticsearch.common.xcontent.ParseField;
 import org.elasticsearch.common.xcontent.ToXContentObject;
 import org.elasticsearch.common.xcontent.XContentBuilder;
 import org.elasticsearch.common.xcontent.XContentParser;
+import org.elasticsearch.core.TimeValue;
+import org.elasticsearch.core.Tuple;
 import org.elasticsearch.persistent.PersistentTaskState;
 
 import java.io.IOException;
+import java.time.Instant;
+import java.time.temporal.ChronoUnit;
 import java.util.Collections;
 import java.util.HashMap;
 import java.util.List;
@@ -29,6 +33,7 @@ import java.util.Objects;
 import java.util.stream.Collectors;
 
 import static org.elasticsearch.common.xcontent.ConstructingObjectParser.constructorArg;
+import static org.elasticsearch.common.xcontent.ConstructingObjectParser.optionalConstructorArg;
 import static org.elasticsearch.ingest.geoip.GeoIpDownloader.GEOIP_DOWNLOADER;
 
 class GeoIpTaskState implements PersistentTaskState, VersionedNamedWriteable {
@@ -61,7 +66,11 @@ class GeoIpTaskState implements PersistentTaskState, VersionedNamedWriteable {
 
     GeoIpTaskState(StreamInput input) throws IOException {
         databases = Collections.unmodifiableMap(input.readMap(StreamInput::readString,
-            in -> new Metadata(in.readLong(), in.readVInt(), in.readVInt(), in.readString())));
+            in -> {
+                long lastUpdate = in.readLong();
+                return new Metadata(lastUpdate, in.readVInt(), in.readVInt(), in.readString(),
+                    in.getVersion().onOrAfter(Version.V_8_0_0) ? in.readLong() : lastUpdate);
+            }));
     }
 
     public GeoIpTaskState put(String name, Metadata metadata) {
@@ -126,12 +135,16 @@ class GeoIpTaskState implements PersistentTaskState, VersionedNamedWriteable {
             o.writeVInt(v.firstChunk);
             o.writeVInt(v.lastChunk);
             o.writeString(v.md5);
+            if (o.getVersion().onOrAfter(Version.V_8_0_0)) {
+                o.writeLong(v.lastCheck);
+            }
         });
     }
 
     static class Metadata implements ToXContentObject {
 
         static final String NAME = GEOIP_DOWNLOADER + "-metadata";
+        private static final ParseField LAST_CHECK = new ParseField("last_check");
         private static final ParseField LAST_UPDATE = new ParseField("last_update");
         private static final ParseField FIRST_CHUNK = new ParseField("first_chunk");
         private static final ParseField LAST_CHUNK = new ParseField("last_chunk");
@@ -139,13 +152,15 @@ class GeoIpTaskState implements PersistentTaskState, VersionedNamedWriteable {
 
         private static final ConstructingObjectParser<Metadata, Void> PARSER =
             new ConstructingObjectParser<>(NAME, true,
-                args -> new Metadata((long) args[0], (int) args[1], (int) args[2], (String) args[3]));
+                args -> new Metadata((long) args[0], (int) args[1], (int) args[2], (String) args[3], (long) (args[4] == null ? args[0] :
+                    args[4])));
 
         static {
             PARSER.declareLong(constructorArg(), LAST_UPDATE);
             PARSER.declareInt(constructorArg(), FIRST_CHUNK);
             PARSER.declareInt(constructorArg(), LAST_CHUNK);
             PARSER.declareString(constructorArg(), MD5);
+            PARSER.declareLong(optionalConstructorArg(), LAST_CHECK);
         }
 
         public static Metadata fromXContent(XContentParser parser) {
@@ -160,18 +175,25 @@ class GeoIpTaskState implements PersistentTaskState, VersionedNamedWriteable {
         private final int firstChunk;
         private final int lastChunk;
         private final String md5;
+        private final long lastCheck;
 
-        Metadata(long lastUpdate, int firstChunk, int lastChunk, String md5) {
+        Metadata(long lastUpdate, int firstChunk, int lastChunk, String md5, long lastCheck) {
             this.lastUpdate = lastUpdate;
             this.firstChunk = firstChunk;
             this.lastChunk = lastChunk;
             this.md5 = Objects.requireNonNull(md5);
+            this.lastCheck = lastCheck;
         }
 
         public long getLastUpdate() {
             return lastUpdate;
         }
 
+        public boolean isValid(Settings settings) {
+            TimeValue valid = settings.getAsTime("ingest.geoip.database_validity", TimeValue.timeValueDays(30));
+            return Instant.ofEpochMilli(lastCheck).isAfter(Instant.now().minus(valid.getMillis(), ChronoUnit.MILLIS));
+        }
+
         public int getFirstChunk() {
             return firstChunk;
         }
@@ -184,6 +206,10 @@ class GeoIpTaskState implements PersistentTaskState, VersionedNamedWriteable {
             return md5;
         }
 
+        public long getLastCheck() {
+            return lastCheck;
+        }
+
         @Override
         public boolean equals(Object o) {
             if (this == o) return true;
@@ -192,12 +218,13 @@ class GeoIpTaskState implements PersistentTaskState, VersionedNamedWriteable {
             return lastUpdate == metadata.lastUpdate
                 && firstChunk == metadata.firstChunk
                 && lastChunk == metadata.lastChunk
+                && lastCheck == metadata.lastCheck
                 && md5.equals(metadata.md5);
         }
 
         @Override
         public int hashCode() {
-            return Objects.hash(lastUpdate, firstChunk, lastChunk, md5);
+            return Objects.hash(lastUpdate, firstChunk, lastChunk, md5, lastCheck);
         }
 
         @Override
@@ -205,6 +232,7 @@ class GeoIpTaskState implements PersistentTaskState, VersionedNamedWriteable {
             builder.startObject();
             {
                 builder.field(LAST_UPDATE.getPreferredName(), lastUpdate);
+                builder.field(LAST_CHECK.getPreferredName(), lastCheck);
                 builder.field(FIRST_CHUNK.getPreferredName(), firstChunk);
                 builder.field(LAST_CHUNK.getPreferredName(), lastChunk);
                 builder.field(MD5.getPreferredName(), md5);

+ 2 - 2
modules/ingest-geoip/src/main/java/org/elasticsearch/ingest/geoip/IngestGeoIpPlugin.java

@@ -91,7 +91,7 @@ public class IngestGeoIpPlugin extends Plugin implements IngestPlugin, SystemInd
         GeoIpCache geoIpCache = new GeoIpCache(cacheSize);
         DatabaseRegistry registry = new DatabaseRegistry(parameters.env, parameters.client, geoIpCache, parameters.genericExecutor);
         databaseRegistry.set(registry);
-        return Map.of(GeoIpProcessor.TYPE, new GeoIpProcessor.Factory(registry));
+        return Map.of(GeoIpProcessor.TYPE, new GeoIpProcessor.Factory(registry, parameters.ingestService.getClusterService()));
     }
 
     @Override
@@ -113,7 +113,7 @@ public class IngestGeoIpPlugin extends Plugin implements IngestPlugin, SystemInd
             throw new UncheckedIOException(e);
         }
 
-        if(GeoIpDownloaderTaskExecutor.ENABLED_DEFAULT == false){
+        if (GeoIpDownloaderTaskExecutor.ENABLED_DEFAULT == false) {
             return List.of(databaseRegistry.get());
         }
 

+ 37 - 9
modules/ingest-geoip/src/main/java/org/elasticsearch/ingest/geoip/stats/GeoIpDownloaderStats.java

@@ -8,6 +8,7 @@
 
 package org.elasticsearch.ingest.geoip.stats;
 
+import org.elasticsearch.Version;
 import org.elasticsearch.common.xcontent.ParseField;
 import org.elasticsearch.common.Strings;
 import org.elasticsearch.common.io.stream.StreamInput;
@@ -23,16 +24,18 @@ import java.util.Objects;
 
 public class GeoIpDownloaderStats implements Task.Status {
 
-    public static final GeoIpDownloaderStats EMPTY = new GeoIpDownloaderStats(0, 0, 0, 0, 0);
+    public static final GeoIpDownloaderStats EMPTY = new GeoIpDownloaderStats(0, 0, 0, 0, 0, 0);
 
     public static final ConstructingObjectParser<GeoIpDownloaderStats, Void> PARSER = new ConstructingObjectParser<>(
-        "geoip_downloader_stats", a -> new GeoIpDownloaderStats((int) a[0], (int) a[1], (long) a[2], (int) a[3], (int) a[4]));
+        "geoip_downloader_stats", a -> new GeoIpDownloaderStats((int) a[0], (int) a[1], (long) a[2], (int) a[3], (int) a[4],
+        a[5] == null ? 0 : (int) a[5]));
 
     private static final ParseField SUCCESSFUL_DOWNLOADS = new ParseField("successful_downloads");
     private static final ParseField FAILED_DOWNLOADS = new ParseField("failed_downloads");
     private static final ParseField TOTAL_DOWNLOAD_TIME = new ParseField("total_download_time");
     private static final ParseField DATABASES_COUNT = new ParseField("databases_count");
     private static final ParseField SKIPPED_DOWNLOADS = new ParseField("skipped_updates");
+    private static final ParseField EXPIRED_DATABASES = new ParseField("expired_databases");
 
     static {
         PARSER.declareInt(ConstructingObjectParser.constructorArg(), SUCCESSFUL_DOWNLOADS);
@@ -40,6 +43,7 @@ public class GeoIpDownloaderStats implements Task.Status {
         PARSER.declareLong(ConstructingObjectParser.constructorArg(), TOTAL_DOWNLOAD_TIME);
         PARSER.declareInt(ConstructingObjectParser.constructorArg(), DATABASES_COUNT);
         PARSER.declareInt(ConstructingObjectParser.constructorArg(), SKIPPED_DOWNLOADS);
+        PARSER.declareInt(ConstructingObjectParser.optionalConstructorArg(), EXPIRED_DATABASES);
     }
 
     private final int successfulDownloads;
@@ -47,6 +51,7 @@ public class GeoIpDownloaderStats implements Task.Status {
     private final long totalDownloadTime;
     private final int databasesCount;
     private final int skippedDownloads;
+    private final int expiredDatabases;
 
     public GeoIpDownloaderStats(StreamInput in) throws IOException {
         successfulDownloads = in.readVInt();
@@ -54,15 +59,21 @@ public class GeoIpDownloaderStats implements Task.Status {
         totalDownloadTime = in.readVLong();
         databasesCount = in.readVInt();
         skippedDownloads = in.readVInt();
+        if (in.getVersion().onOrAfter(Version.V_8_0_0)) {
+            expiredDatabases = in.readVInt();
+        } else {
+            expiredDatabases = 0;
+        }
     }
 
     private GeoIpDownloaderStats(int successfulDownloads, int failedDownloads, long totalDownloadTime, int databasesCount,
-                                 int skippedDownloads) {
+                                 int skippedDownloads, int expiredDatabases) {
         this.successfulDownloads = successfulDownloads;
         this.failedDownloads = failedDownloads;
         this.totalDownloadTime = totalDownloadTime;
         this.databasesCount = databasesCount;
         this.skippedDownloads = skippedDownloads;
+        this.expiredDatabases = expiredDatabases;
     }
 
     public int getSuccessfulDownloads() {
@@ -85,21 +96,33 @@ public class GeoIpDownloaderStats implements Task.Status {
         return skippedDownloads;
     }
 
+    public int getExpiredDatabases() {
+        return expiredDatabases;
+    }
+
     public GeoIpDownloaderStats skippedDownload() {
-        return new GeoIpDownloaderStats(successfulDownloads, failedDownloads, totalDownloadTime, databasesCount, skippedDownloads + 1);
+        return new GeoIpDownloaderStats(successfulDownloads, failedDownloads, totalDownloadTime, databasesCount, skippedDownloads + 1,
+            expiredDatabases);
     }
 
     public GeoIpDownloaderStats successfulDownload(long downloadTime) {
         return new GeoIpDownloaderStats(successfulDownloads + 1, failedDownloads, totalDownloadTime + Math.max(downloadTime, 0),
-            databasesCount, skippedDownloads);
+            databasesCount, skippedDownloads, expiredDatabases);
     }
 
     public GeoIpDownloaderStats failedDownload() {
-        return new GeoIpDownloaderStats(successfulDownloads, failedDownloads + 1, totalDownloadTime, databasesCount, skippedDownloads);
+        return new GeoIpDownloaderStats(successfulDownloads, failedDownloads + 1, totalDownloadTime, databasesCount, skippedDownloads,
+            expiredDatabases);
     }
 
     public GeoIpDownloaderStats count(int databasesCount) {
-        return new GeoIpDownloaderStats(successfulDownloads, failedDownloads, totalDownloadTime, databasesCount, skippedDownloads);
+        return new GeoIpDownloaderStats(successfulDownloads, failedDownloads, totalDownloadTime, databasesCount, skippedDownloads,
+            expiredDatabases);
+    }
+
+    public GeoIpDownloaderStats expiredDatabases(int expiredDatabases) {
+        return new GeoIpDownloaderStats(successfulDownloads, failedDownloads, totalDownloadTime, databasesCount, skippedDownloads,
+            expiredDatabases);
     }
 
     @Override
@@ -110,6 +133,7 @@ public class GeoIpDownloaderStats implements Task.Status {
         builder.field(TOTAL_DOWNLOAD_TIME.getPreferredName(), totalDownloadTime);
         builder.field(DATABASES_COUNT.getPreferredName(), databasesCount);
         builder.field(SKIPPED_DOWNLOADS.getPreferredName(), skippedDownloads);
+        builder.field(EXPIRED_DATABASES.getPreferredName(), expiredDatabases);
         builder.endObject();
         return builder;
     }
@@ -125,6 +149,9 @@ public class GeoIpDownloaderStats implements Task.Status {
         out.writeVLong(totalDownloadTime);
         out.writeVInt(databasesCount);
         out.writeVInt(skippedDownloads);
+        if (out.getVersion().onOrAfter(Version.V_8_0_0)) {
+            out.writeVInt(expiredDatabases);
+        }
     }
 
     @Override
@@ -136,12 +163,13 @@ public class GeoIpDownloaderStats implements Task.Status {
             failedDownloads == that.failedDownloads &&
             totalDownloadTime == that.totalDownloadTime &&
             databasesCount == that.databasesCount &&
-            skippedDownloads == that.skippedDownloads;
+            skippedDownloads == that.skippedDownloads &&
+            expiredDatabases == that.expiredDatabases;
     }
 
     @Override
     public int hashCode() {
-        return Objects.hash(successfulDownloads, failedDownloads, totalDownloadTime, databasesCount, skippedDownloads);
+        return Objects.hash(successfulDownloads, failedDownloads, totalDownloadTime, databasesCount, skippedDownloads, expiredDatabases);
     }
 
     @Override

+ 12 - 12
modules/ingest-geoip/src/test/java/org/elasticsearch/ingest/geoip/DatabaseRegistryTests.java

@@ -80,7 +80,6 @@ import static org.elasticsearch.persistent.PersistentTasksCustomMetadata.Persist
 import static org.elasticsearch.persistent.PersistentTasksCustomMetadata.TYPE;
 import static org.hamcrest.Matchers.empty;
 import static org.hamcrest.Matchers.equalTo;
-import static org.hamcrest.Matchers.hasSize;
 import static org.hamcrest.Matchers.notNullValue;
 import static org.hamcrest.Matchers.nullValue;
 import static org.mockito.Matchers.any;
@@ -128,7 +127,8 @@ public class DatabaseRegistryTests extends ESTestCase {
         String md5 = mockSearches("GeoIP2-City.mmdb", 5, 14);
         String taskId = GeoIpDownloader.GEOIP_DOWNLOADER;
         PersistentTask<?> task = new PersistentTask<>(taskId, GeoIpDownloader.GEOIP_DOWNLOADER, new GeoIpTaskParams(), 1, null);
-        task = new PersistentTask<>(task, new GeoIpTaskState(Map.of("GeoIP2-City.mmdb", new GeoIpTaskState.Metadata(10L, 5, 14, md5))));
+        task = new PersistentTask<>(task, new GeoIpTaskState(Map.of("GeoIP2-City.mmdb",
+            new GeoIpTaskState.Metadata(10, 5, 14, md5, 10))));
         PersistentTasksCustomMetadata tasksCustomMetadata = new PersistentTasksCustomMetadata(1L, Map.of(taskId, task));
 
         ClusterState state = ClusterState.builder(new ClusterName("name"))
@@ -142,16 +142,14 @@ public class DatabaseRegistryTests extends ESTestCase {
         assertThat(databaseRegistry.getDatabase("GeoIP2-City.mmdb", false), nullValue());
         databaseRegistry.checkDatabases(state);
         DatabaseReaderLazyLoader database = databaseRegistry.getDatabase("GeoIP2-City.mmdb", false);
-        assertThat(database, notNullValue());
-        verify(client, times(10)).search(any());
+        assertThat(database, nullValue());
+        verify(client, times(0)).search(any());
         try (Stream<Path> files = Files.list(geoIpTmpDir.resolve("geoip-databases").resolve("nodeId"))) {
-            assertThat(files.collect(Collectors.toList()), hasSize(1));
+            assertEquals(0, files.count());
         }
-        IllegalStateException e = expectThrows(IllegalStateException.class, database::get);
-        assertEquals("database [GeoIP2-City.mmdb] was not updated for 30 days and is disabled", e.getMessage());
 
         task = new PersistentTask<>(task, new GeoIpTaskState(Map.of("GeoIP2-City.mmdb",
-            new GeoIpTaskState.Metadata(System.currentTimeMillis(), 5, 14, md5))));
+            new GeoIpTaskState.Metadata(10, 5, 14, md5, System.currentTimeMillis()))));
         tasksCustomMetadata = new PersistentTasksCustomMetadata(1L, Map.of(taskId, task));
 
         state = ClusterState.builder(new ClusterName("name"))
@@ -163,6 +161,8 @@ public class DatabaseRegistryTests extends ESTestCase {
             .build();
         databaseRegistry.checkDatabases(state);
         database = databaseRegistry.getDatabase("GeoIP2-City.mmdb", false);
+        assertThat(database, notNullValue());
+        verify(client, times(10)).search(any());
         //30 days check passed but we mocked mmdb data so parsing will fail
         expectThrows(InvalidDatabaseException.class, database::get);
     }
@@ -171,7 +171,7 @@ public class DatabaseRegistryTests extends ESTestCase {
         String md5 = mockSearches("GeoIP2-City.mmdb", 0, 9);
         String taskId = GeoIpDownloader.GEOIP_DOWNLOADER;
         PersistentTask<?> task = new PersistentTask<>(taskId, GeoIpDownloader.GEOIP_DOWNLOADER, new GeoIpTaskParams(), 1, null);
-        task = new PersistentTask<>(task, new GeoIpTaskState(Map.of("GeoIP2-City.mmdb", new GeoIpTaskState.Metadata(0L, 0, 9, md5))));
+        task = new PersistentTask<>(task, new GeoIpTaskState(Map.of("GeoIP2-City.mmdb", new GeoIpTaskState.Metadata(0L, 0, 9, md5, 10))));
         PersistentTasksCustomMetadata tasksCustomMetadata = new PersistentTasksCustomMetadata(1L, Map.of(taskId, task));
 
         ClusterState state = ClusterState.builder(new ClusterName("name"))
@@ -195,7 +195,7 @@ public class DatabaseRegistryTests extends ESTestCase {
         String md5 = mockSearches("GeoIP2-City.mmdb", 0, 9);
         String taskId = GeoIpDownloader.GEOIP_DOWNLOADER;
         PersistentTask<?> task = new PersistentTask<>(taskId, GeoIpDownloader.GEOIP_DOWNLOADER, new GeoIpTaskParams(), 1, null);
-        task = new PersistentTask<>(task, new GeoIpTaskState(Map.of("GeoIP2-City.mmdb", new GeoIpTaskState.Metadata(0L, 0, 9, md5))));
+        task = new PersistentTask<>(task, new GeoIpTaskState(Map.of("GeoIP2-City.mmdb", new GeoIpTaskState.Metadata(0L, 0, 9, md5, 10))));
         PersistentTasksCustomMetadata tasksCustomMetadata = new PersistentTasksCustomMetadata(1L, Map.of(taskId, task));
 
         ClusterState state = ClusterState.builder(new ClusterName("name"))
@@ -236,7 +236,7 @@ public class DatabaseRegistryTests extends ESTestCase {
 
     public void testRetrieveDatabase() throws Exception {
         String md5 = mockSearches("_name", 0, 29);
-        GeoIpTaskState.Metadata metadata = new GeoIpTaskState.Metadata(-1, 0, 29, md5);
+        GeoIpTaskState.Metadata metadata = new GeoIpTaskState.Metadata(-1, 0, 29, md5, 10);
 
         @SuppressWarnings("unchecked")
         CheckedConsumer<byte[], IOException> chunkConsumer = mock(CheckedConsumer.class);
@@ -254,7 +254,7 @@ public class DatabaseRegistryTests extends ESTestCase {
     public void testRetrieveDatabaseCorruption() throws Exception {
         String md5 = mockSearches("_name", 0, 9);
         String incorrectMd5 = "different";
-        GeoIpTaskState.Metadata metadata = new GeoIpTaskState.Metadata(-1, 0, 9, incorrectMd5);
+        GeoIpTaskState.Metadata metadata = new GeoIpTaskState.Metadata(-1, 0, 9, incorrectMd5, 10);
 
         @SuppressWarnings("unchecked")
         CheckedConsumer<byte[], IOException> chunkConsumer = mock(CheckedConsumer.class);

+ 2 - 2
modules/ingest-geoip/src/test/java/org/elasticsearch/ingest/geoip/GeoIpDownloaderTests.java

@@ -279,13 +279,13 @@ public class GeoIpDownloaderTests extends ESTestCase {
             }
         };
 
-        geoIpDownloader.setState(GeoIpTaskState.EMPTY.put("test.mmdb", new GeoIpTaskState.Metadata(0, 5, 8, "0")));
+        geoIpDownloader.setState(GeoIpTaskState.EMPTY.put("test.mmdb", new GeoIpTaskState.Metadata(0, 5, 8, "0", 0)));
         geoIpDownloader.processDatabase(Map.of("name", "test.tgz", "url", "http://a.b/t1", "md5_hash", "1"));
     }
 
 
     public void testProcessDatabaseSame() throws IOException {
-        GeoIpTaskState.Metadata metadata = new GeoIpTaskState.Metadata(0, 4, 10, "1");
+        GeoIpTaskState.Metadata metadata = new GeoIpTaskState.Metadata(0, 4, 10, "1", 0);
         GeoIpTaskState taskState = GeoIpTaskState.EMPTY.put("test.mmdb", metadata);
         ByteArrayInputStream bais = new ByteArrayInputStream(new byte[0]);
         when(httpClient.get("a.b/t1")).thenReturn(bais);

+ 22 - 16
modules/ingest-geoip/src/test/java/org/elasticsearch/ingest/geoip/GeoIpProcessorFactoryTests.java

@@ -12,6 +12,8 @@ import com.carrotsearch.randomizedtesting.generators.RandomPicks;
 import org.elasticsearch.ElasticsearchParseException;
 import org.elasticsearch.ResourceNotFoundException;
 import org.elasticsearch.client.Client;
+import org.elasticsearch.cluster.ClusterState;
+import org.elasticsearch.cluster.service.ClusterService;
 import org.elasticsearch.common.Randomness;
 import org.elasticsearch.common.settings.Settings;
 import org.elasticsearch.index.VersionType;
@@ -43,11 +45,13 @@ import static org.hamcrest.Matchers.equalTo;
 import static org.hamcrest.Matchers.notNullValue;
 import static org.hamcrest.Matchers.sameInstance;
 import static org.mockito.Mockito.mock;
+import static org.mockito.Mockito.when;
 
 public class GeoIpProcessorFactoryTests extends ESTestCase {
 
     private Path geoipTmpDir;
     private DatabaseRegistry databaseRegistry;
+    private ClusterService clusterService;
 
     @Before
     public void loadDatabaseReaders() throws IOException {
@@ -62,6 +66,8 @@ public class GeoIpProcessorFactoryTests extends ESTestCase {
         LocalDatabases localDatabases = new LocalDatabases(geoIpDir, geoIpConfigDir, new GeoIpCache(1000));
         geoipTmpDir = createTempDir();
         databaseRegistry = new DatabaseRegistry(geoipTmpDir, client, cache, localDatabases, Runnable::run);
+        clusterService = mock(ClusterService.class);
+        when(clusterService.state()).thenReturn(ClusterState.EMPTY_STATE);
     }
 
     @After
@@ -71,7 +77,7 @@ public class GeoIpProcessorFactoryTests extends ESTestCase {
     }
 
     public void testBuildDefaults() throws Exception {
-        GeoIpProcessor.Factory factory = new GeoIpProcessor.Factory(databaseRegistry);
+        GeoIpProcessor.Factory factory = new GeoIpProcessor.Factory(databaseRegistry, clusterService);
 
         Map<String, Object> config = new HashMap<>();
         config.put("field", "_field");
@@ -87,7 +93,7 @@ public class GeoIpProcessorFactoryTests extends ESTestCase {
     }
 
     public void testSetIgnoreMissing() throws Exception {
-        GeoIpProcessor.Factory factory = new GeoIpProcessor.Factory(databaseRegistry);
+        GeoIpProcessor.Factory factory = new GeoIpProcessor.Factory(databaseRegistry, clusterService);
 
         Map<String, Object> config = new HashMap<>();
         config.put("field", "_field");
@@ -104,7 +110,7 @@ public class GeoIpProcessorFactoryTests extends ESTestCase {
     }
 
     public void testCountryBuildDefaults() throws Exception {
-        GeoIpProcessor.Factory factory = new GeoIpProcessor.Factory(databaseRegistry);
+        GeoIpProcessor.Factory factory = new GeoIpProcessor.Factory(databaseRegistry, clusterService);
 
         Map<String, Object> config = new HashMap<>();
         config.put("field", "_field");
@@ -122,7 +128,7 @@ public class GeoIpProcessorFactoryTests extends ESTestCase {
     }
 
     public void testAsnBuildDefaults() throws Exception {
-        GeoIpProcessor.Factory factory = new GeoIpProcessor.Factory(databaseRegistry);
+        GeoIpProcessor.Factory factory = new GeoIpProcessor.Factory(databaseRegistry, clusterService);
 
         Map<String, Object> config = new HashMap<>();
         config.put("field", "_field");
@@ -140,7 +146,7 @@ public class GeoIpProcessorFactoryTests extends ESTestCase {
     }
 
     public void testBuildTargetField() throws Exception {
-        GeoIpProcessor.Factory factory = new GeoIpProcessor.Factory(databaseRegistry);
+        GeoIpProcessor.Factory factory = new GeoIpProcessor.Factory(databaseRegistry, clusterService);
         Map<String, Object> config = new HashMap<>();
         config.put("field", "_field");
         config.put("target_field", "_field");
@@ -151,7 +157,7 @@ public class GeoIpProcessorFactoryTests extends ESTestCase {
     }
 
     public void testBuildDbFile() throws Exception {
-        GeoIpProcessor.Factory factory = new GeoIpProcessor.Factory(databaseRegistry);
+        GeoIpProcessor.Factory factory = new GeoIpProcessor.Factory(databaseRegistry, clusterService);
         Map<String, Object> config = new HashMap<>();
         config.put("field", "_field");
         config.put("database_file", "GeoLite2-Country.mmdb");
@@ -164,7 +170,7 @@ public class GeoIpProcessorFactoryTests extends ESTestCase {
     }
 
     public void testBuildWithCountryDbAndAsnFields() throws Exception {
-        GeoIpProcessor.Factory factory = new GeoIpProcessor.Factory(databaseRegistry);
+        GeoIpProcessor.Factory factory = new GeoIpProcessor.Factory(databaseRegistry, clusterService);
         Map<String, Object> config = new HashMap<>();
         config.put("field", "_field");
         config.put("database_file", "GeoLite2-Country.mmdb");
@@ -178,7 +184,7 @@ public class GeoIpProcessorFactoryTests extends ESTestCase {
     }
 
     public void testBuildWithAsnDbAndCityFields() throws Exception {
-        GeoIpProcessor.Factory factory = new GeoIpProcessor.Factory(databaseRegistry);
+        GeoIpProcessor.Factory factory = new GeoIpProcessor.Factory(databaseRegistry, clusterService);
         Map<String, Object> config = new HashMap<>();
         config.put("field", "_field");
         config.put("database_file", "GeoLite2-ASN.mmdb");
@@ -192,7 +198,7 @@ public class GeoIpProcessorFactoryTests extends ESTestCase {
     }
 
     public void testBuildNonExistingDbFile() throws Exception {
-        GeoIpProcessor.Factory factory = new GeoIpProcessor.Factory(databaseRegistry);
+        GeoIpProcessor.Factory factory = new GeoIpProcessor.Factory(databaseRegistry, clusterService);
 
         Map<String, Object> config = new HashMap<>();
         config.put("field", "_field");
@@ -202,7 +208,7 @@ public class GeoIpProcessorFactoryTests extends ESTestCase {
     }
 
     public void testBuildFields() throws Exception {
-        GeoIpProcessor.Factory factory = new GeoIpProcessor.Factory(databaseRegistry);
+        GeoIpProcessor.Factory factory = new GeoIpProcessor.Factory(databaseRegistry, clusterService);
 
         Set<GeoIpProcessor.Property> properties = EnumSet.noneOf(GeoIpProcessor.Property.class);
         List<String> fieldNames = new ArrayList<>();
@@ -226,7 +232,7 @@ public class GeoIpProcessorFactoryTests extends ESTestCase {
     }
 
     public void testBuildIllegalFieldOption() throws Exception {
-        GeoIpProcessor.Factory factory = new GeoIpProcessor.Factory(databaseRegistry);
+        GeoIpProcessor.Factory factory = new GeoIpProcessor.Factory(databaseRegistry, clusterService);
 
         Map<String, Object> config1 = new HashMap<>();
         config1.put("field", "_field");
@@ -256,7 +262,7 @@ public class GeoIpProcessorFactoryTests extends ESTestCase {
         GeoIpCache cache = new GeoIpCache(1000);
         LocalDatabases localDatabases = new LocalDatabases(geoIpDir, geoIpConfigDir, cache);
         DatabaseRegistry databaseRegistry = new DatabaseRegistry(createTempDir(), client, cache, localDatabases, Runnable::run);
-        GeoIpProcessor.Factory factory = new GeoIpProcessor.Factory(databaseRegistry);
+        GeoIpProcessor.Factory factory = new GeoIpProcessor.Factory(databaseRegistry, clusterService);
         for (DatabaseReaderLazyLoader lazyLoader : localDatabases.getAllDatabases()) {
             assertNull(lazyLoader.databaseReader.get());
         }
@@ -319,7 +325,7 @@ public class GeoIpProcessorFactoryTests extends ESTestCase {
         GeoIpCache cache = new GeoIpCache(1000);
         DatabaseRegistry databaseRegistry = new DatabaseRegistry(createTempDir(), client, cache, localDatabases, Runnable::run);
         databaseRegistry.initialize("nodeId", resourceWatcherService, mock(IngestService.class));
-        GeoIpProcessor.Factory factory = new GeoIpProcessor.Factory(databaseRegistry);
+        GeoIpProcessor.Factory factory = new GeoIpProcessor.Factory(databaseRegistry, clusterService);
         for (DatabaseReaderLazyLoader lazyLoader : localDatabases.getAllDatabases()) {
             assertNull(lazyLoader.databaseReader.get());
         }
@@ -342,7 +348,7 @@ public class GeoIpProcessorFactoryTests extends ESTestCase {
     }
 
     public void testFallbackUsingDefaultDatabases() throws Exception {
-        GeoIpProcessor.Factory factory = new GeoIpProcessor.Factory(databaseRegistry);
+        GeoIpProcessor.Factory factory = new GeoIpProcessor.Factory(databaseRegistry, clusterService);
         {
             Map<String, Object> config = new HashMap<>();
             config.put("field", "source_field");
@@ -363,7 +369,7 @@ public class GeoIpProcessorFactoryTests extends ESTestCase {
 
     public void testFallbackUsingDefaultDatabasesWhileIngesting() throws Exception {
         copyDatabaseFile(geoipTmpDir, "GeoLite2-City-Test.mmdb");
-        GeoIpProcessor.Factory factory = new GeoIpProcessor.Factory(databaseRegistry);
+        GeoIpProcessor.Factory factory = new GeoIpProcessor.Factory(databaseRegistry, clusterService);
         // fallback_to_default_databases=true, first use default city db then a custom city db:
         {
             Map<String, Object> config = new HashMap<>();
@@ -379,7 +385,7 @@ public class GeoIpProcessorFactoryTests extends ESTestCase {
             Map<?, ?> geoData = (Map<?, ?>) ingestDocument.getSourceAndMetadata().get("geoip");
             assertThat(geoData.get("city_name"), equalTo("Tumba"));
 
-            databaseRegistry.updateDatabase("GeoLite2-City.mmdb", "md5", geoipTmpDir.resolve("GeoLite2-City-Test.mmdb"), 0);
+            databaseRegistry.updateDatabase("GeoLite2-City.mmdb", "md5", geoipTmpDir.resolve("GeoLite2-City-Test.mmdb"));
             ingestDocument = RandomDocumentPicks.randomIngestDocument(random(), document);
             processor.execute(ingestDocument);
             geoData = (Map<?, ?>) ingestDocument.getSourceAndMetadata().get("geoip");

+ 35 - 18
modules/ingest-geoip/src/test/java/org/elasticsearch/ingest/geoip/GeoIpProcessorTests.java

@@ -9,6 +9,7 @@
 package org.elasticsearch.ingest.geoip;
 
 import com.maxmind.geoip2.DatabaseReader;
+
 import org.elasticsearch.common.CheckedSupplier;
 import org.elasticsearch.core.PathUtils;
 import org.elasticsearch.ingest.IngestDocument;
@@ -28,6 +29,7 @@ import java.util.function.Supplier;
 import static org.elasticsearch.ingest.IngestDocumentMatcher.assertIngestDocument;
 import static org.hamcrest.Matchers.containsString;
 import static org.hamcrest.Matchers.equalTo;
+import static org.hamcrest.Matchers.hasEntry;
 import static org.hamcrest.Matchers.is;
 import static org.hamcrest.Matchers.nullValue;
 
@@ -35,7 +37,7 @@ public class GeoIpProcessorTests extends ESTestCase {
 
     public void testCity() throws Exception {
         GeoIpProcessor processor = new GeoIpProcessor(randomAlphaOfLength(10), null, "source_field",
-                loader("/GeoLite2-City.mmdb"), "target_field", EnumSet.allOf(GeoIpProcessor.Property.class), false, false);
+            loader("/GeoLite2-City.mmdb"), () -> true, "target_field", EnumSet.allOf(GeoIpProcessor.Property.class), false, false);
 
         Map<String, Object> document = new HashMap<>();
         document.put("source_field", "8.8.8.8");
@@ -59,7 +61,7 @@ public class GeoIpProcessorTests extends ESTestCase {
 
     public void testNullValueWithIgnoreMissing() throws Exception {
         GeoIpProcessor processor = new GeoIpProcessor(randomAlphaOfLength(10), null, "source_field",
-                loader("/GeoLite2-City.mmdb"), "target_field", EnumSet.allOf(GeoIpProcessor.Property.class), true, false);
+            loader("/GeoLite2-City.mmdb"), () -> true, "target_field", EnumSet.allOf(GeoIpProcessor.Property.class), true, false);
         IngestDocument originalIngestDocument = RandomDocumentPicks.randomIngestDocument(random(),
             Collections.singletonMap("source_field", null));
         IngestDocument ingestDocument = new IngestDocument(originalIngestDocument);
@@ -69,7 +71,7 @@ public class GeoIpProcessorTests extends ESTestCase {
 
     public void testNonExistentWithIgnoreMissing() throws Exception {
         GeoIpProcessor processor = new GeoIpProcessor(randomAlphaOfLength(10), null, "source_field",
-                loader("/GeoLite2-City.mmdb"), "target_field", EnumSet.allOf(GeoIpProcessor.Property.class), true, false);
+            loader("/GeoLite2-City.mmdb"), () -> true, "target_field", EnumSet.allOf(GeoIpProcessor.Property.class), true, false);
         IngestDocument originalIngestDocument = RandomDocumentPicks.randomIngestDocument(random(), Collections.emptyMap());
         IngestDocument ingestDocument = new IngestDocument(originalIngestDocument);
         processor.execute(ingestDocument);
@@ -78,7 +80,7 @@ public class GeoIpProcessorTests extends ESTestCase {
 
     public void testNullWithoutIgnoreMissing() throws Exception {
         GeoIpProcessor processor = new GeoIpProcessor(randomAlphaOfLength(10), null, "source_field",
-                loader("/GeoLite2-City.mmdb"), "target_field", EnumSet.allOf(GeoIpProcessor.Property.class), false, false);
+            loader("/GeoLite2-City.mmdb"), () -> true, "target_field", EnumSet.allOf(GeoIpProcessor.Property.class), false, false);
         IngestDocument originalIngestDocument = RandomDocumentPicks.randomIngestDocument(random(),
             Collections.singletonMap("source_field", null));
         IngestDocument ingestDocument = new IngestDocument(originalIngestDocument);
@@ -88,7 +90,7 @@ public class GeoIpProcessorTests extends ESTestCase {
 
     public void testNonExistentWithoutIgnoreMissing() throws Exception {
         GeoIpProcessor processor = new GeoIpProcessor(randomAlphaOfLength(10), null, "source_field",
-                loader("/GeoLite2-City.mmdb"), "target_field", EnumSet.allOf(GeoIpProcessor.Property.class), false, false);
+            loader("/GeoLite2-City.mmdb"), () -> true, "target_field", EnumSet.allOf(GeoIpProcessor.Property.class), false, false);
         IngestDocument originalIngestDocument = RandomDocumentPicks.randomIngestDocument(random(), Collections.emptyMap());
         IngestDocument ingestDocument = new IngestDocument(originalIngestDocument);
         Exception exception = expectThrows(Exception.class, () -> processor.execute(ingestDocument));
@@ -97,7 +99,7 @@ public class GeoIpProcessorTests extends ESTestCase {
 
     public void testCity_withIpV6() throws Exception {
         GeoIpProcessor processor = new GeoIpProcessor(randomAlphaOfLength(10), null, "source_field",
-                loader("/GeoLite2-City.mmdb"), "target_field", EnumSet.allOf(GeoIpProcessor.Property.class), false, false);
+            loader("/GeoLite2-City.mmdb"), () -> true, "target_field", EnumSet.allOf(GeoIpProcessor.Property.class), false, false);
 
         String address = "2602:306:33d3:8000::3257:9652";
         Map<String, Object> document = new HashMap<>();
@@ -125,7 +127,7 @@ public class GeoIpProcessorTests extends ESTestCase {
 
     public void testCityWithMissingLocation() throws Exception {
         GeoIpProcessor processor = new GeoIpProcessor(randomAlphaOfLength(10), null, "source_field",
-                loader("/GeoLite2-City.mmdb"), "target_field", EnumSet.allOf(GeoIpProcessor.Property.class), false, false);
+            loader("/GeoLite2-City.mmdb"), () -> true, "target_field", EnumSet.allOf(GeoIpProcessor.Property.class), false, false);
 
         Map<String, Object> document = new HashMap<>();
         document.put("source_field", "80.231.5.0");
@@ -141,7 +143,7 @@ public class GeoIpProcessorTests extends ESTestCase {
 
     public void testCountry() throws Exception {
         GeoIpProcessor processor = new GeoIpProcessor(randomAlphaOfLength(10), null, "source_field",
-                loader("/GeoLite2-Country.mmdb"), "target_field", EnumSet.allOf(GeoIpProcessor.Property.class), false, false);
+            loader("/GeoLite2-Country.mmdb"), () -> true, "target_field", EnumSet.allOf(GeoIpProcessor.Property.class), false, false);
 
         Map<String, Object> document = new HashMap<>();
         document.put("source_field", "82.170.213.79");
@@ -160,7 +162,7 @@ public class GeoIpProcessorTests extends ESTestCase {
 
     public void testCountryWithMissingLocation() throws Exception {
         GeoIpProcessor processor = new GeoIpProcessor(randomAlphaOfLength(10), null, "source_field",
-                loader("/GeoLite2-Country.mmdb"), "target_field", EnumSet.allOf(GeoIpProcessor.Property.class), false, false);
+            loader("/GeoLite2-Country.mmdb"), () -> true, "target_field", EnumSet.allOf(GeoIpProcessor.Property.class), false, false);
 
         Map<String, Object> document = new HashMap<>();
         document.put("source_field", "80.231.5.0");
@@ -177,7 +179,7 @@ public class GeoIpProcessorTests extends ESTestCase {
     public void testAsn() throws Exception {
         String ip = "82.171.64.0";
         GeoIpProcessor processor = new GeoIpProcessor(randomAlphaOfLength(10), null, "source_field",
-                loader("/GeoLite2-ASN.mmdb"), "target_field", EnumSet.allOf(GeoIpProcessor.Property.class), false, false);
+            loader("/GeoLite2-ASN.mmdb"), () -> true, "target_field", EnumSet.allOf(GeoIpProcessor.Property.class), false, false);
 
         Map<String, Object> document = new HashMap<>();
         document.put("source_field", ip);
@@ -196,7 +198,7 @@ public class GeoIpProcessorTests extends ESTestCase {
 
     public void testAddressIsNotInTheDatabase() throws Exception {
         GeoIpProcessor processor = new GeoIpProcessor(randomAlphaOfLength(10), null, "source_field",
-                loader("/GeoLite2-City.mmdb"), "target_field", EnumSet.allOf(GeoIpProcessor.Property.class), false, false);
+            loader("/GeoLite2-City.mmdb"), () -> true, "target_field", EnumSet.allOf(GeoIpProcessor.Property.class), false, false);
 
         Map<String, Object> document = new HashMap<>();
         document.put("source_field", "127.0.0.1");
@@ -205,10 +207,12 @@ public class GeoIpProcessorTests extends ESTestCase {
         assertThat(ingestDocument.getSourceAndMetadata().containsKey("target_field"), is(false));
     }
 
-    /** Don't silently do DNS lookups or anything trappy on bogus data */
+    /**
+     * Don't silently do DNS lookups or anything trappy on bogus data
+     */
     public void testInvalid() throws Exception {
         GeoIpProcessor processor = new GeoIpProcessor(randomAlphaOfLength(10), null, "source_field",
-                loader("/GeoLite2-City.mmdb"), "target_field", EnumSet.allOf(GeoIpProcessor.Property.class), false, false);
+            loader("/GeoLite2-City.mmdb"), () -> true, "target_field", EnumSet.allOf(GeoIpProcessor.Property.class), false, false);
 
         Map<String, Object> document = new HashMap<>();
         document.put("source_field", "www.google.com");
@@ -219,7 +223,7 @@ public class GeoIpProcessorTests extends ESTestCase {
 
     public void testListAllValid() throws Exception {
         GeoIpProcessor processor = new GeoIpProcessor(randomAlphaOfLength(10), null, "source_field",
-            loader("/GeoLite2-City.mmdb"), "target_field", EnumSet.allOf(GeoIpProcessor.Property.class), false, false);
+            loader("/GeoLite2-City.mmdb"), () -> true, "target_field", EnumSet.allOf(GeoIpProcessor.Property.class), false, false);
 
         Map<String, Object> document = new HashMap<>();
         document.put("source_field", Arrays.asList("8.8.8.8", "82.171.64.0"));
@@ -239,7 +243,7 @@ public class GeoIpProcessorTests extends ESTestCase {
 
     public void testListPartiallyValid() throws Exception {
         GeoIpProcessor processor = new GeoIpProcessor(randomAlphaOfLength(10), null, "source_field",
-            loader("/GeoLite2-City.mmdb"), "target_field", EnumSet.allOf(GeoIpProcessor.Property.class), false, false);
+            loader("/GeoLite2-City.mmdb"), () -> true, "target_field", EnumSet.allOf(GeoIpProcessor.Property.class), false, false);
 
         Map<String, Object> document = new HashMap<>();
         document.put("source_field", Arrays.asList("8.8.8.8", "127.0.0.1"));
@@ -259,7 +263,7 @@ public class GeoIpProcessorTests extends ESTestCase {
 
     public void testListNoMatches() throws Exception {
         GeoIpProcessor processor = new GeoIpProcessor(randomAlphaOfLength(10), null, "source_field",
-            loader("/GeoLite2-City.mmdb"), "target_field", EnumSet.allOf(GeoIpProcessor.Property.class), false, false);
+            loader("/GeoLite2-City.mmdb"), () -> true, "target_field", EnumSet.allOf(GeoIpProcessor.Property.class), false, false);
 
         Map<String, Object> document = new HashMap<>();
         document.put("source_field", Arrays.asList("127.0.0.1", "127.0.0.1"));
@@ -271,7 +275,7 @@ public class GeoIpProcessorTests extends ESTestCase {
 
     public void testListFirstOnly() throws Exception {
         GeoIpProcessor processor = new GeoIpProcessor(randomAlphaOfLength(10), null, "source_field",
-            loader("/GeoLite2-City.mmdb"), "target_field", EnumSet.allOf(GeoIpProcessor.Property.class), false, true);
+            loader("/GeoLite2-City.mmdb"), () -> true, "target_field", EnumSet.allOf(GeoIpProcessor.Property.class), false, true);
 
         Map<String, Object> document = new HashMap<>();
         document.put("source_field", Arrays.asList("8.8.8.8", "127.0.0.1"));
@@ -289,7 +293,19 @@ public class GeoIpProcessorTests extends ESTestCase {
 
     public void testListFirstOnlyNoMatches() throws Exception {
         GeoIpProcessor processor = new GeoIpProcessor(randomAlphaOfLength(10), null, "source_field",
-            loader("/GeoLite2-City.mmdb"), "target_field", EnumSet.allOf(GeoIpProcessor.Property.class), false, true);
+            loader("/GeoLite2-City.mmdb"), () -> true, "target_field", EnumSet.allOf(GeoIpProcessor.Property.class), false, true);
+
+        Map<String, Object> document = new HashMap<>();
+        document.put("source_field", Arrays.asList("127.0.0.1", "127.0.0.2"));
+        IngestDocument ingestDocument = RandomDocumentPicks.randomIngestDocument(random(), document);
+        processor.execute(ingestDocument);
+
+        assertThat(ingestDocument.getSourceAndMetadata().containsKey("target_field"), is(false));
+    }
+
+    public void testInvalidDatabase() throws Exception {
+        GeoIpProcessor processor = new GeoIpProcessor(randomAlphaOfLength(10), null, "source_field",
+            loader("/GeoLite2-City.mmdb"), () -> false, "target_field", EnumSet.allOf(GeoIpProcessor.Property.class), false, true);
 
         Map<String, Object> document = new HashMap<>();
         document.put("source_field", Arrays.asList("127.0.0.1", "127.0.0.2"));
@@ -297,6 +313,7 @@ public class GeoIpProcessorTests extends ESTestCase {
         processor.execute(ingestDocument);
 
         assertThat(ingestDocument.getSourceAndMetadata().containsKey("target_field"), is(false));
+        assertThat(ingestDocument.getSourceAndMetadata(), hasEntry("tags", List.of("_geoip_expired_database")));
     }
 
     private CheckedSupplier<DatabaseReaderLazyLoader, IOException> loader(final String path) {

+ 2 - 1
modules/ingest-geoip/src/test/java/org/elasticsearch/ingest/geoip/GeoIpTaskStateSerializationTests.java

@@ -30,7 +30,8 @@ public class GeoIpTaskStateSerializationTests extends AbstractSerializingTestCas
         GeoIpTaskState state = GeoIpTaskState.EMPTY;
         int databaseCount = randomInt(20);
         for (int i = 0; i < databaseCount; i++) {
-            GeoIpTaskState.Metadata metadata = new GeoIpTaskState.Metadata(randomLong(), randomInt(), randomInt(), randomAlphaOfLength(32));
+            GeoIpTaskState.Metadata metadata = new GeoIpTaskState.Metadata(randomLong(), randomInt(), randomInt(),
+                randomAlphaOfLength(32), randomLong());
             state = state.put(randomAlphaOfLengthBetween(5, 10), metadata);
         }
         return state;