Browse Source

Allow custom geoip databases to be updated at runtime. (#68901)

Custom geoip databases can be provided via the config/ingest-geoip directory,
which are loaded at node startup time. This change adds the functionality
that reloads custom databases at runtime.

Added reference counting when getting a DatabaseReaderLazyLoader instance,
this to avoid closing a maxmind db reader while it is still being used.
There is a small window of time where this might happen during a database update.

A DatabaseReaderLazyLoader instance (which wraps a Maxmind db reader) from config database directory:
* Is closed immediately if there are no usages of it by any geoip processor instance as part of the database reload.
* When there are usages, then it is not closed immediately after a database reload. It is closed by the caller that did the last geoip lookup using this DatabaseReaderLazyLoader instance.
Martijn van Groningen 4 years ago
parent
commit
683a14c504
17 changed files with 844 additions and 216 deletions
  1. 4 0
      modules/ingest-geoip/build.gradle
  2. 0 0
      modules/ingest-geoip/qa/build.gradle
  3. 23 0
      modules/ingest-geoip/qa/file-based-update/build.gradle
  4. 56 0
      modules/ingest-geoip/qa/file-based-update/src/test/java/org/elasticsearch/ingest/geoip/UpdateDatabasesIT.java
  5. BIN
      modules/ingest-geoip/qa/file-based-update/src/test/resources/GeoLite2-City-Test.mmdb
  6. 1 1
      modules/ingest-geoip/src/internalClusterTest/java/org/elasticsearch/ingest/geoip/GeoIpProcessorNonIngestNodeIT.java
  7. 164 0
      modules/ingest-geoip/src/internalClusterTest/java/org/elasticsearch/ingest/geoip/ReloadingDatabasesWhilePerformingGeoLookupsIT.java
  8. 55 2
      modules/ingest-geoip/src/main/java/org/elasticsearch/ingest/geoip/DatabaseReaderLazyLoader.java
  9. 17 0
      modules/ingest-geoip/src/main/java/org/elasticsearch/ingest/geoip/GeoIpCache.java
  10. 61 41
      modules/ingest-geoip/src/main/java/org/elasticsearch/ingest/geoip/GeoIpProcessor.java
  11. 37 104
      modules/ingest-geoip/src/main/java/org/elasticsearch/ingest/geoip/IngestGeoIpPlugin.java
  12. 212 0
      modules/ingest-geoip/src/main/java/org/elasticsearch/ingest/geoip/LocalDatabases.java
  13. 39 65
      modules/ingest-geoip/src/test/java/org/elasticsearch/ingest/geoip/GeoIpProcessorFactoryTests.java
  14. 5 3
      modules/ingest-geoip/src/test/java/org/elasticsearch/ingest/geoip/GeoIpProcessorTests.java
  15. 170 0
      modules/ingest-geoip/src/test/java/org/elasticsearch/ingest/geoip/LocalDatabasesTests.java
  16. BIN
      modules/ingest-geoip/src/test/resources/GeoIP2-City-Test.mmdb
  17. BIN
      modules/ingest-geoip/src/test/resources/GeoLite2-City-Test.mmdb

+ 4 - 0
modules/ingest-geoip/build.gradle

@@ -72,3 +72,7 @@ if (Os.isFamily(Os.FAMILY_WINDOWS)) {
     systemProperty 'es.geoip.load_db_on_heap', 'true'
   }
 }
+
+tasks.named("forbiddenPatterns").configure {
+  exclude '**/*.mmdb'
+}

+ 0 - 0
modules/ingest-geoip/qa/build.gradle


+ 23 - 0
modules/ingest-geoip/qa/file-based-update/build.gradle

@@ -0,0 +1,23 @@
+/*
+ * Copyright Elasticsearch B.V. and/or licensed to Elasticsearch B.V. under one
+ * or more contributor license agreements. Licensed under the Elastic License
+ * 2.0 and the Server Side Public License, v 1; you may not use this file except
+ * in compliance with, at your election, the Elastic License 2.0 or the Server
+ * Side Public License, v 1.
+ */
+
+apply plugin: 'elasticsearch.standalone-rest-test'
+apply plugin: 'elasticsearch.rest-test'
+
+testClusters.all {
+  testDistribution = 'DEFAULT'
+}
+
+tasks.named("integTest").configure {
+  systemProperty 'tests.security.manager', 'false' // Allows the test the add databases to config directory.
+  nonInputProperties.systemProperty 'tests.config.dir', "${-> testClusters.integTest.singleNode().getConfigDir()}"
+}
+
+tasks.named("forbiddenPatterns").configure {
+  exclude '**/*.mmdb'
+}

+ 56 - 0
modules/ingest-geoip/qa/file-based-update/src/test/java/org/elasticsearch/ingest/geoip/UpdateDatabasesIT.java

@@ -0,0 +1,56 @@
+/*
+ * Copyright Elasticsearch B.V. and/or licensed to Elasticsearch B.V. under one
+ * or more contributor license agreements. Licensed under the Elastic License
+ * 2.0 and the Server Side Public License, v 1; you may not use this file except
+ * in compliance with, at your election, the Elastic License 2.0 or the Server
+ * Side Public License, v 1.
+ */
+package org.elasticsearch.ingest.geoip;
+
+import org.apache.http.util.EntityUtils;
+import org.elasticsearch.client.Request;
+import org.elasticsearch.client.Response;
+import org.elasticsearch.common.io.PathUtils;
+import org.elasticsearch.common.xcontent.ObjectPath;
+import org.elasticsearch.common.xcontent.XContentHelper;
+import org.elasticsearch.common.xcontent.json.JsonXContent;
+import org.elasticsearch.test.rest.ESRestTestCase;
+
+import java.io.IOException;
+import java.nio.file.Files;
+import java.nio.file.Path;
+import java.util.Map;
+
+import static org.hamcrest.Matchers.equalTo;
+import static org.hamcrest.Matchers.is;
+
+public class UpdateDatabasesIT extends ESRestTestCase {
+
+    public void test() throws Exception {
+        String body = "{\"pipeline\":{\"processors\":[{\"geoip\":{\"field\":\"ip\"}}]}," +
+            "\"docs\":[{\"_index\":\"index\",\"_id\":\"id\",\"_source\":{\"ip\":\"89.160.20.128\"}}]}";
+        Request simulatePipelineRequest = new Request("POST", "/_ingest/pipeline/_simulate");
+        simulatePipelineRequest.setJsonEntity(body);
+        {
+            Map<String, Object> response = toMap(client().performRequest(simulatePipelineRequest));
+            assertThat(ObjectPath.eval("docs.0.doc._source.geoip.city_name", response), equalTo("Tumba"));
+        }
+
+        Path configPath = PathUtils.get(System.getProperty("tests.config.dir"));
+        assertThat(Files.exists(configPath), is(true));
+        Path ingestGeoipDatabaseDir = configPath.resolve("ingest-geoip");
+        Files.createDirectory(ingestGeoipDatabaseDir);
+        Files.copy(UpdateDatabasesIT.class.getResourceAsStream("/GeoLite2-City-Test.mmdb"),
+            ingestGeoipDatabaseDir.resolve("GeoLite2-City.mmdb"));
+
+        assertBusy(() -> {
+            Map<String, Object> response = toMap(client().performRequest(simulatePipelineRequest));
+            assertThat(ObjectPath.eval("docs.0.doc._source.geoip.city_name", response), equalTo("Linköping"));
+        });
+    }
+
+    private static Map<String, Object> toMap(Response response) throws IOException {
+        return XContentHelper.convertToMap(JsonXContent.jsonXContent, EntityUtils.toString(response.getEntity()), false);
+    }
+
+}

BIN
modules/ingest-geoip/qa/file-based-update/src/test/resources/GeoLite2-City-Test.mmdb


+ 1 - 1
modules/ingest-geoip/src/internalClusterTest/java/org/elasticsearch/ingest/geoip/GeoIpProcessorNonIngestNodeIT.java

@@ -156,7 +156,7 @@ public class GeoIpProcessorNonIngestNodeIT extends ESIntegTestCase {
     private void assertDatabaseLoadStatus(final String node, final boolean loaded) {
         final IngestService ingestService = internalCluster().getInstance(IngestService.class, node);
         final GeoIpProcessor.Factory factory = (GeoIpProcessor.Factory)ingestService.getProcessorFactories().get("geoip");
-        for (final DatabaseReaderLazyLoader loader : factory.databaseReaders().values()) {
+        for (final DatabaseReaderLazyLoader loader : factory.getAllDatabases()) {
             if (loaded) {
                 assertNotNull(loader.databaseReader.get());
             } else {

+ 164 - 0
modules/ingest-geoip/src/internalClusterTest/java/org/elasticsearch/ingest/geoip/ReloadingDatabasesWhilePerformingGeoLookupsIT.java

@@ -0,0 +1,164 @@
+/*
+ * Copyright Elasticsearch B.V. and/or licensed to Elasticsearch B.V. under one
+ * or more contributor license agreements. Licensed under the Elastic License
+ * 2.0 and the Server Side Public License, v 1; you may not use this file except
+ * in compliance with, at your election, the Elastic License 2.0 or the Server
+ * Side Public License, v 1.
+ */
+
+package org.elasticsearch.ingest.geoip;
+
+import org.elasticsearch.common.network.InetAddresses;
+import org.elasticsearch.common.settings.Settings;
+import org.elasticsearch.common.util.concurrent.AtomicArray;
+import org.elasticsearch.index.VersionType;
+import org.elasticsearch.ingest.IngestDocument;
+import org.elasticsearch.test.ESTestCase;
+import org.elasticsearch.threadpool.TestThreadPool;
+import org.elasticsearch.threadpool.ThreadPool;
+import org.elasticsearch.watcher.ResourceWatcherService;
+
+import java.io.IOException;
+import java.nio.file.Files;
+import java.nio.file.Path;
+import java.nio.file.StandardCopyOption;
+import java.util.Arrays;
+import java.util.HashMap;
+import java.util.Map;
+import java.util.concurrent.atomic.AtomicBoolean;
+import java.util.concurrent.atomic.AtomicInteger;
+import java.util.concurrent.atomic.AtomicReference;
+
+import static org.elasticsearch.ingest.geoip.GeoIpProcessorFactoryTests.copyDatabaseFiles;
+import static org.hamcrest.Matchers.equalTo;
+import static org.hamcrest.Matchers.greaterThan;
+import static org.hamcrest.Matchers.not;
+import static org.hamcrest.Matchers.notNullValue;
+import static org.hamcrest.Matchers.nullValue;
+import static org.hamcrest.Matchers.sameInstance;
+
+public class ReloadingDatabasesWhilePerformingGeoLookupsIT extends ESTestCase {
+
+    /**
+     * This tests essentially verifies that a Maxmind database reader doesn't fail with:
+     * com.maxmind.db.ClosedDatabaseException: The MaxMind DB has been closed
+     *
+     * This failure can be avoided by ensuring that a database is only closed when no
+     * geoip processor instance is using the related {@link DatabaseReaderLazyLoader} instance
+     */
+    public void test() throws Exception {
+        ThreadPool threadPool = new TestThreadPool("test");
+        ResourceWatcherService resourceWatcherService = new ResourceWatcherService(Settings.EMPTY, threadPool);
+        try {
+            final Path geoIpDir = createTempDir();
+            copyDatabaseFiles(geoIpDir);
+            final Path geoIpConfigDir = createTempDir();
+            Files.copy(LocalDatabases.class.getResourceAsStream("/GeoLite2-City-Test.mmdb"),
+                geoIpConfigDir.resolve("GeoLite2-City.mmdb"));
+            Files.copy(LocalDatabases.class.getResourceAsStream("/GeoLite2-City-Test.mmdb"),
+                geoIpConfigDir.resolve("GeoLite2-City-Test.mmdb"));
+
+            LocalDatabases localDatabases = new LocalDatabases(geoIpDir, geoIpConfigDir, new GeoIpCache(0));
+            localDatabases.initialize(resourceWatcherService);
+            GeoIpProcessor.Factory factory = new GeoIpProcessor.Factory(localDatabases);
+            lazyLoadReaders(localDatabases);
+
+            final GeoIpProcessor processor1 = factory.create(null, "_tag", null, new HashMap<>(Map.of("field", "_field")));
+            final GeoIpProcessor processor2 = factory.create(null, "_tag", null,
+                new HashMap<>(Map.of("field", "_field", "database_file", "GeoLite2-City-Test.mmdb")));
+
+            final AtomicBoolean completed = new AtomicBoolean(false);
+            final int numberOfDatabaseUpdates = randomIntBetween(2, 4);
+            final AtomicInteger numberOfIngestRuns = new AtomicInteger();
+            final int numberOfIngestThreads = randomIntBetween(16, 32);
+            final Thread[] ingestThreads = new Thread[numberOfIngestThreads];
+            final AtomicArray<Throwable> ingestFailures = new AtomicArray<>(numberOfIngestThreads);
+            for (int i = 0; i < numberOfIngestThreads; i++) {
+                final int id = i;
+                ingestThreads[id] = new Thread(() -> {
+                    while (completed.get() == false) {
+                        try {
+                            IngestDocument document1 =
+                                new IngestDocument("index", "id", "routing", 1L, VersionType.EXTERNAL, Map.of("_field", "89.160.20.128"));
+                            processor1.execute(document1);
+                            assertThat(document1.getSourceAndMetadata().get("geoip"), notNullValue());
+                            IngestDocument document2 =
+                                new IngestDocument("index", "id", "routing", 1L, VersionType.EXTERNAL, Map.of("_field", "89.160.20.128"));
+                            processor2.execute(document2);
+                            assertThat(document2.getSourceAndMetadata().get("geoip"), notNullValue());
+                            numberOfIngestRuns.incrementAndGet();
+                        } catch (Exception | AssertionError e) {
+                            logger.error("error in ingest thread after run [" + numberOfIngestRuns.get() + "]", e);
+                            ingestFailures.setOnce(id, e);
+                            break;
+                        }
+                    }
+                });
+            }
+
+            final AtomicReference<Throwable> failureHolder2 = new AtomicReference<>();
+            Thread updateDatabaseThread = new Thread(() -> {
+                for (int i = 0; i < numberOfDatabaseUpdates; i++) {
+                    try {
+                        DatabaseReaderLazyLoader previous1 = localDatabases.configDatabases.get("GeoLite2-City.mmdb");
+                        if (Files.exists(geoIpConfigDir.resolve("GeoLite2-City.mmdb")) && randomBoolean()) {
+                            Files.delete(geoIpConfigDir.resolve("GeoLite2-City.mmdb"));
+                        } else {
+                            Files.copy(LocalDatabases.class.getResourceAsStream("/GeoLite2-City-Test.mmdb"),
+                                geoIpConfigDir.resolve("GeoLite2-City.mmdb"), StandardCopyOption.REPLACE_EXISTING);
+                        }
+                        DatabaseReaderLazyLoader previous2 = localDatabases.configDatabases.get("GeoLite2-City-Test.mmdb");
+                        Files.copy(LocalDatabases.class.getResourceAsStream("/GeoLite2-City-Test.mmdb"),
+                            geoIpConfigDir.resolve("GeoLite2-City-Test.mmdb"), StandardCopyOption.REPLACE_EXISTING);
+                        assertBusy(() -> {
+                            DatabaseReaderLazyLoader current1 = localDatabases.configDatabases.get("GeoLite2-City.mmdb");
+                            DatabaseReaderLazyLoader current2 = localDatabases.configDatabases.get("GeoLite2-City-Test.mmdb");
+                            assertThat(current1, not(sameInstance(previous1)));
+                            assertThat(current2, not(sameInstance(previous2)));
+                        });
+
+                        // lazy load type and reader:
+                        lazyLoadReaders(localDatabases);
+                    } catch (Exception | AssertionError e) {
+                        logger.error("error in update databases thread after run [" + i + "]", e);
+                        failureHolder2.set(e);
+                        break;
+                    }
+                }
+                completed.set(true);
+            });
+
+            Arrays.stream(ingestThreads).forEach(Thread::start);
+            updateDatabaseThread.start();
+            Arrays.stream(ingestThreads).forEach(thread -> {
+                try {
+                    thread.join();
+                } catch (InterruptedException e) {
+                    throw new AssertionError(e);
+                }
+            });
+            updateDatabaseThread.join();
+
+            ingestFailures.asList().forEach(r -> assertThat(r, nullValue()));
+            assertThat(failureHolder2.get(), nullValue());
+            assertThat(numberOfIngestRuns.get(), greaterThan(0));
+
+            for (DatabaseReaderLazyLoader lazyLoader : localDatabases.getAllDatabases()) {
+                assertThat(lazyLoader.current(), equalTo(0));
+            }
+        } finally {
+            resourceWatcherService.close();
+            threadPool.shutdown();
+        }
+    }
+
+    private static void lazyLoadReaders(LocalDatabases localDatabases) throws IOException {
+        if (localDatabases.configDatabases.get("GeoLite2-City.mmdb") != null) {
+            localDatabases.configDatabases.get("GeoLite2-City.mmdb").getDatabaseType();
+            localDatabases.configDatabases.get("GeoLite2-City.mmdb").getCity(InetAddresses.forString("2.125.160.216"));
+        }
+        localDatabases.configDatabases.get("GeoLite2-City-Test.mmdb").getDatabaseType();
+        localDatabases.configDatabases.get("GeoLite2-City-Test.mmdb").getCity(InetAddresses.forString("2.125.160.216"));
+    }
+
+}

+ 55 - 2
modules/ingest-geoip/src/main/java/org/elasticsearch/ingest/geoip/DatabaseReaderLazyLoader.java

@@ -8,6 +8,8 @@
 
 package org.elasticsearch.ingest.geoip;
 
+import com.maxmind.db.NoCache;
+import com.maxmind.db.Reader;
 import com.maxmind.geoip2.DatabaseReader;
 import com.maxmind.geoip2.exception.AddressNotFoundException;
 import com.maxmind.geoip2.model.AbstractResponse;
@@ -18,8 +20,10 @@ import org.apache.logging.log4j.LogManager;
 import org.apache.logging.log4j.Logger;
 import org.apache.lucene.util.SetOnce;
 import org.elasticsearch.SpecialPermission;
+import org.elasticsearch.common.Booleans;
 import org.elasticsearch.common.CheckedBiFunction;
 import org.elasticsearch.common.CheckedSupplier;
+import org.elasticsearch.common.SuppressForbidden;
 import org.elasticsearch.core.internal.io.IOUtils;
 
 import java.io.Closeable;
@@ -32,6 +36,7 @@ import java.nio.file.Path;
 import java.security.AccessController;
 import java.security.PrivilegedAction;
 import java.util.Objects;
+import java.util.concurrent.atomic.AtomicInteger;
 
 /**
  * Facilitates lazy loading of the database reader, so that when the geoip plugin is installed, but not used,
@@ -39,6 +44,9 @@ import java.util.Objects;
  */
 class DatabaseReaderLazyLoader implements Closeable {
 
+    private static final boolean LOAD_DATABASE_ON_HEAP =
+        Booleans.parseBoolean(System.getProperty("es.geoip.load_db_on_heap", "false"));
+
     private static final Logger LOGGER = LogManager.getLogger(DatabaseReaderLazyLoader.class);
 
     private final GeoIpCache cache;
@@ -49,6 +57,12 @@ class DatabaseReaderLazyLoader implements Closeable {
     // cache the database type so that we do not re-read it on every pipeline execution
     final SetOnce<String> databaseType;
 
+    private final AtomicInteger currentUsages = new AtomicInteger(0);
+
+    DatabaseReaderLazyLoader(final GeoIpCache cache, final Path databasePath) {
+        this(cache, databasePath, createDatabaseLoader(databasePath));
+    }
+
     DatabaseReaderLazyLoader(final GeoIpCache cache, final Path databasePath, final CheckedSupplier<DatabaseReader, IOException> loader) {
         this.cache = cache;
         this.databasePath = Objects.requireNonNull(databasePath);
@@ -147,6 +161,20 @@ class DatabaseReaderLazyLoader implements Closeable {
         return getResponse(ipAddress, DatabaseReader::asn);
     }
 
+    boolean preLookup() {
+        return currentUsages.updateAndGet(current -> current < 0 ? current : current + 1) > 0;
+    }
+
+    void postLookup() throws IOException {
+        if (currentUsages.updateAndGet(current -> current > 0 ? current - 1 : current + 1) == -1) {
+            doClose();
+        }
+    }
+
+    int current() {
+        return currentUsages.get();
+    }
+
     private <T extends AbstractResponse> T getResponse(InetAddress ipAddress,
                                                        CheckedBiFunction<DatabaseReader, InetAddress, T, Exception> responseProvider) {
         SpecialPermission.check();
@@ -162,7 +190,7 @@ class DatabaseReaderLazyLoader implements Closeable {
             }));
     }
 
-    private DatabaseReader get() throws IOException {
+    DatabaseReader get() throws IOException {
         if (databaseReader.get() == null) {
             synchronized (databaseReader) {
                 if (databaseReader.get() == null) {
@@ -175,8 +203,33 @@ class DatabaseReaderLazyLoader implements Closeable {
     }
 
     @Override
-    public synchronized void close() throws IOException {
+    public void close() throws IOException {
+        if (currentUsages.updateAndGet(u -> -1 - u) == -1) {
+            doClose();
+        }
+    }
+
+    private void doClose() throws IOException {
         IOUtils.close(databaseReader.get());
+        int numEntriesEvicted = cache.purgeCacheEntriesForDatabase(databasePath);
+        LOGGER.info("evicted [{}] entries from cache after reloading database [{}]", numEntriesEvicted, databasePath);
+    }
+
+    private static CheckedSupplier<DatabaseReader, IOException> createDatabaseLoader(Path databasePath) {
+        return () -> {
+            DatabaseReader.Builder builder = createDatabaseBuilder(databasePath).withCache(NoCache.getInstance());
+            if (LOAD_DATABASE_ON_HEAP) {
+                builder.fileMode(Reader.FileMode.MEMORY);
+            } else {
+                builder.fileMode(Reader.FileMode.MEMORY_MAPPED);
+            }
+            return builder.build();
+        };
+    }
+
+    @SuppressForbidden(reason = "Maxmind API requires java.io.File")
+    private static DatabaseReader.Builder createDatabaseBuilder(Path databasePath) {
+        return new DatabaseReader.Builder(databasePath.toFile());
     }
 
 }

+ 17 - 0
modules/ingest-geoip/src/main/java/org/elasticsearch/ingest/geoip/GeoIpCache.java

@@ -13,6 +13,7 @@ import org.elasticsearch.common.cache.Cache;
 import org.elasticsearch.common.cache.CacheBuilder;
 
 import java.net.InetAddress;
+import java.nio.file.Path;
 import java.util.Objects;
 import java.util.function.Function;
 
@@ -55,6 +56,22 @@ final class GeoIpCache {
         return cache.get(cacheKey);
     }
 
+    public int purgeCacheEntriesForDatabase(Path databaseFile) {
+        String databasePath = databaseFile.toString();
+        int counter = 0;
+        for (CacheKey key : cache.keys()) {
+            if (key.databasePath.equals(databasePath)) {
+                cache.invalidate(key);
+                counter++;
+            }
+        }
+        return counter;
+    }
+
+    public int count() {
+        return cache.count();
+    }
+
     /**
      * The key to use for the cache. Since this cache can span multiple geoip processors that all use different databases, the database
      * path is needed to be included in the cache key. For example, if we only used the IP address as the key the City and ASN the same

+ 61 - 41
modules/ingest-geoip/src/main/java/org/elasticsearch/ingest/geoip/GeoIpProcessor.java

@@ -18,6 +18,8 @@ import com.maxmind.geoip2.record.Country;
 import com.maxmind.geoip2.record.Location;
 import com.maxmind.geoip2.record.Subdivision;
 import org.elasticsearch.ElasticsearchParseException;
+import org.elasticsearch.ResourceNotFoundException;
+import org.elasticsearch.common.CheckedSupplier;
 import org.elasticsearch.common.network.InetAddresses;
 import org.elasticsearch.common.network.NetworkAddress;
 import org.elasticsearch.ingest.AbstractProcessor;
@@ -34,6 +36,7 @@ import java.util.HashMap;
 import java.util.List;
 import java.util.Locale;
 import java.util.Map;
+import java.util.Objects;
 import java.util.Set;
 
 import static org.elasticsearch.ingest.ConfigurationUtils.newConfigurationException;
@@ -50,7 +53,7 @@ public final class GeoIpProcessor extends AbstractProcessor {
 
     private final String field;
     private final String targetField;
-    private final DatabaseReaderLazyLoader lazyLoader;
+    private final CheckedSupplier<DatabaseReaderLazyLoader, IOException> supplier;
     private final Set<Property> properties;
     private final boolean ignoreMissing;
     private final boolean firstOnly;
@@ -60,7 +63,7 @@ public final class GeoIpProcessor extends AbstractProcessor {
      *  @param tag           the processor tag
      * @param description   the processor description
      * @param field         the source field to geo-IP map
-     * @param lazyLoader    a supplier of a geo-IP database reader; ideally this is lazily-loaded once on first use
+     * @param supplier    a supplier of a geo-IP database reader; ideally this is lazily-loaded once on first use
      * @param targetField   the target field
      * @param properties    the properties; ideally this is lazily-loaded once on first use
      * @param ignoreMissing true if documents with a missing value for the field should be ignored
@@ -69,7 +72,7 @@ public final class GeoIpProcessor extends AbstractProcessor {
     GeoIpProcessor(
         final String tag,
         String description, final String field,
-        final DatabaseReaderLazyLoader lazyLoader,
+        final CheckedSupplier<DatabaseReaderLazyLoader, IOException> supplier,
         final String targetField,
         final Set<Property> properties,
         final boolean ignoreMissing,
@@ -77,7 +80,7 @@ public final class GeoIpProcessor extends AbstractProcessor {
         super(tag, description);
         this.field = field;
         this.targetField = targetField;
-        this.lazyLoader = lazyLoader;
+        this.supplier = supplier;
         this.properties = properties;
         this.ignoreMissing = ignoreMissing;
         this.firstOnly = firstOnly;
@@ -131,32 +134,37 @@ public final class GeoIpProcessor extends AbstractProcessor {
     }
 
     private Map<String, Object> getGeoData(String ip) throws IOException {
-        String databaseType = lazyLoader.getDatabaseType();
-        final InetAddress ipAddress = InetAddresses.forString(ip);
-        Map<String, Object> geoData;
-        if (databaseType.endsWith(CITY_DB_SUFFIX)) {
-            try {
-                geoData = retrieveCityGeoData(ipAddress);
-            } catch (AddressNotFoundRuntimeException e) {
-                geoData = Collections.emptyMap();
-            }
-        } else if (databaseType.endsWith(COUNTRY_DB_SUFFIX)) {
-            try {
-                geoData = retrieveCountryGeoData(ipAddress);
-            } catch (AddressNotFoundRuntimeException e) {
-                geoData = Collections.emptyMap();
-            }
-        } else if (databaseType.endsWith(ASN_DB_SUFFIX)) {
-            try {
-                geoData = retrieveAsnGeoData(ipAddress);
-            } catch (AddressNotFoundRuntimeException e) {
-                geoData = Collections.emptyMap();
+        DatabaseReaderLazyLoader lazyLoader = this.supplier.get();
+        try {
+            final String databaseType = lazyLoader.getDatabaseType();
+            final InetAddress ipAddress = InetAddresses.forString(ip);
+            Map<String, Object> geoData;
+            if (databaseType.endsWith(CITY_DB_SUFFIX)) {
+                try {
+                    geoData = retrieveCityGeoData(lazyLoader, ipAddress);
+                } catch (AddressNotFoundRuntimeException e) {
+                    geoData = Collections.emptyMap();
+                }
+            } else if (databaseType.endsWith(COUNTRY_DB_SUFFIX)) {
+                try {
+                    geoData = retrieveCountryGeoData(lazyLoader, ipAddress);
+                } catch (AddressNotFoundRuntimeException e) {
+                    geoData = Collections.emptyMap();
+                }
+            } else if (databaseType.endsWith(ASN_DB_SUFFIX)) {
+                try {
+                    geoData = retrieveAsnGeoData(lazyLoader, ipAddress);
+                } catch (AddressNotFoundRuntimeException e) {
+                    geoData = Collections.emptyMap();
+                }
+            } else {
+                throw new ElasticsearchParseException("Unsupported database type [" + lazyLoader.getDatabaseType()
+                    + "]", new IllegalStateException());
             }
-        } else {
-            throw new ElasticsearchParseException("Unsupported database type [" + lazyLoader.getDatabaseType()
-                + "]", new IllegalStateException());
+            return geoData;
+        } finally {
+            lazyLoader.postLookup();
         }
-        return geoData;
     }
 
     @Override
@@ -173,14 +181,14 @@ public final class GeoIpProcessor extends AbstractProcessor {
     }
 
     String getDatabaseType() throws IOException {
-        return lazyLoader.getDatabaseType();
+        return supplier.get().getDatabaseType();
     }
 
     Set<Property> getProperties() {
         return properties;
     }
 
-    private Map<String, Object> retrieveCityGeoData(InetAddress ipAddress) {
+    private Map<String, Object> retrieveCityGeoData(DatabaseReaderLazyLoader lazyLoader, InetAddress ipAddress) {
         CityResponse response = lazyLoader.getCity(ipAddress);
         Country country = response.getCountry();
         City city = response.getCity();
@@ -255,7 +263,7 @@ public final class GeoIpProcessor extends AbstractProcessor {
         return geoData;
     }
 
-    private Map<String, Object> retrieveCountryGeoData(InetAddress ipAddress) {
+    private Map<String, Object> retrieveCountryGeoData(DatabaseReaderLazyLoader lazyLoader, InetAddress ipAddress) {
         CountryResponse response = lazyLoader.getCountry(ipAddress);
         Country country = response.getCountry();
         Continent continent = response.getContinent();
@@ -289,7 +297,7 @@ public final class GeoIpProcessor extends AbstractProcessor {
         return geoData;
     }
 
-    private Map<String, Object> retrieveAsnGeoData(InetAddress ipAddress) {
+    private Map<String, Object> retrieveAsnGeoData(DatabaseReaderLazyLoader lazyLoader, InetAddress ipAddress) {
         AsnResponse response = lazyLoader.getAsn(ipAddress);
         Integer asn = response.getAutonomousSystemNumber();
         String organization_name = response.getAutonomousSystemOrganization();
@@ -333,14 +341,14 @@ public final class GeoIpProcessor extends AbstractProcessor {
             Property.IP, Property.ASN, Property.ORGANIZATION_NAME, Property.NETWORK
         ));
 
-        private final Map<String, DatabaseReaderLazyLoader> databaseReaders;
+        private final LocalDatabases localDatabases;
 
-        Map<String, DatabaseReaderLazyLoader> databaseReaders() {
-            return Collections.unmodifiableMap(databaseReaders);
+        List<DatabaseReaderLazyLoader> getAllDatabases() {
+            return localDatabases.getAllDatabases();
         }
 
-        public Factory(Map<String, DatabaseReaderLazyLoader> databaseReaders) {
-            this.databaseReaders = databaseReaders;
+        public Factory(LocalDatabases localDatabases) {
+            this.localDatabases = localDatabases;
         }
 
         @Override
@@ -355,13 +363,17 @@ public final class GeoIpProcessor extends AbstractProcessor {
             boolean ignoreMissing = readBooleanProperty(TYPE, processorTag, config, "ignore_missing", false);
             boolean firstOnly = readBooleanProperty(TYPE, processorTag, config, "first_only", true);
 
-            DatabaseReaderLazyLoader lazyLoader = databaseReaders.get(databaseFile);
+            DatabaseReaderLazyLoader lazyLoader = localDatabases.getDatabase(databaseFile);
             if (lazyLoader == null) {
                 throw newConfigurationException(TYPE, processorTag,
                     "database_file", "database file [" + databaseFile + "] doesn't exist");
             }
-
-            final String databaseType = lazyLoader.getDatabaseType();
+            final String databaseType;
+            try {
+                databaseType = lazyLoader.getDatabaseType();
+            } finally {
+                lazyLoader.postLookup();
+            }
 
             final Set<Property> properties;
             if (propertyNames != null) {
@@ -387,7 +399,15 @@ public final class GeoIpProcessor extends AbstractProcessor {
                 }
             }
 
-            return new GeoIpProcessor(processorTag, description, ipField, lazyLoader, targetField, properties, ignoreMissing, firstOnly);
+            CheckedSupplier<DatabaseReaderLazyLoader, IOException> supplier = () -> {
+                DatabaseReaderLazyLoader loader = localDatabases.getDatabase(databaseFile);
+                if (loader == null) {
+                    throw new ResourceNotFoundException("database_file", "database file [" + databaseFile + "] doesn't exist");
+                }
+                assert Objects.equals(databaseType, loader.getDatabaseType());
+                return loader;
+            };
+            return new GeoIpProcessor(processorTag, description, ipField, supplier, targetField, properties, ignoreMissing, firstOnly);
         }
     }
 

+ 37 - 104
modules/ingest-geoip/src/main/java/org/elasticsearch/ingest/geoip/IngestGeoIpPlugin.java

@@ -8,30 +8,31 @@
 
 package org.elasticsearch.ingest.geoip;
 
-import com.maxmind.db.NoCache;
-import com.maxmind.db.Reader;
-import com.maxmind.geoip2.DatabaseReader;
-import org.elasticsearch.common.Booleans;
-import org.elasticsearch.common.CheckedSupplier;
-import org.elasticsearch.common.SuppressForbidden;
-import org.elasticsearch.common.io.PathUtils;
+import org.apache.lucene.util.SetOnce;
+import org.elasticsearch.client.Client;
+import org.elasticsearch.cluster.metadata.IndexNameExpressionResolver;
+import org.elasticsearch.cluster.service.ClusterService;
+import org.elasticsearch.common.io.stream.NamedWriteableRegistry;
 import org.elasticsearch.common.settings.Setting;
-import org.elasticsearch.core.internal.io.IOUtils;
+import org.elasticsearch.common.xcontent.NamedXContentRegistry;
+import org.elasticsearch.env.Environment;
+import org.elasticsearch.env.NodeEnvironment;
 import org.elasticsearch.ingest.Processor;
 import org.elasticsearch.plugins.IngestPlugin;
 import org.elasticsearch.plugins.Plugin;
+import org.elasticsearch.repositories.RepositoriesService;
+import org.elasticsearch.script.ScriptService;
+import org.elasticsearch.threadpool.ThreadPool;
+import org.elasticsearch.watcher.ResourceWatcherService;
 
 import java.io.Closeable;
 import java.io.IOException;
-import java.nio.file.Files;
-import java.nio.file.Path;
-import java.nio.file.PathMatcher;
+import java.io.UncheckedIOException;
+import java.util.Collection;
 import java.util.Collections;
-import java.util.HashMap;
-import java.util.Iterator;
 import java.util.List;
 import java.util.Map;
-import java.util.stream.Stream;
+import java.util.function.Supplier;
 
 public class IngestGeoIpPlugin extends Plugin implements IngestPlugin, Closeable {
     public static final Setting<Long> CACHE_SIZE =
@@ -39,7 +40,7 @@ public class IngestGeoIpPlugin extends Plugin implements IngestPlugin, Closeable
 
     static String[] DEFAULT_DATABASE_FILENAMES = new String[]{"GeoLite2-ASN.mmdb", "GeoLite2-City.mmdb", "GeoLite2-Country.mmdb"};
 
-    private Map<String, DatabaseReaderLazyLoader> databaseReaders;
+    private final SetOnce<LocalDatabases> localDatabases = new SetOnce<>();
 
     @Override
     public List<Setting<?>> getSettings() {
@@ -48,104 +49,36 @@ public class IngestGeoIpPlugin extends Plugin implements IngestPlugin, Closeable
 
     @Override
     public Map<String, Processor.Factory> getProcessors(Processor.Parameters parameters) {
-        if (databaseReaders != null) {
-            throw new IllegalStateException("getProcessors called twice for geoip plugin!!");
-        }
         final long cacheSize = CACHE_SIZE.get(parameters.env.settings());
         final GeoIpCache cache = new GeoIpCache(cacheSize);
-        final Path geoIpDirectory = getGeoIpDirectory(parameters);
-        final Path geoIpConfigDirectory = parameters.env.configFile().resolve("ingest-geoip");
-        try {
-            databaseReaders = loadDatabaseReaders(cache, geoIpDirectory, geoIpConfigDirectory);
-        } catch (IOException e) {
-            throw new RuntimeException(e);
-        }
-        return Collections.singletonMap(GeoIpProcessor.TYPE, new GeoIpProcessor.Factory(databaseReaders));
+        LocalDatabases localDatabases = new LocalDatabases(parameters.env, cache);
+        this.localDatabases.set(localDatabases);
+        return Collections.singletonMap(GeoIpProcessor.TYPE, new GeoIpProcessor.Factory(localDatabases));
     }
 
-    /*
-     * In GeoIpProcessorNonIngestNodeTests, ingest-geoip is loaded on the classpath. This means that the plugin is never unbundled into a
-     * directory where the database files would live. Therefore, we have to copy these database files ourselves. To do this, we need the
-     * ability to specify where those database files would go. We do this by adding a plugin that registers ingest.geoip.database_path as
-     * an actual setting. Otherwise, in production code, this setting is not registered and the database path is not configurable.
-     */
-    @SuppressForbidden(reason = "PathUtils#get")
-    private Path getGeoIpDirectory(Processor.Parameters parameters) {
-        final Path geoIpDirectory;
-        if (parameters.env.settings().get("ingest.geoip.database_path") == null) {
-            geoIpDirectory = parameters.env.modulesFile().resolve("ingest-geoip");
-        } else {
-            geoIpDirectory = PathUtils.get(parameters.env.settings().get("ingest.geoip.database_path"));
-        }
-        return geoIpDirectory;
-    }
-
-    static Map<String, DatabaseReaderLazyLoader> loadDatabaseReaders(GeoIpCache cache,
-                                                                     Path geoIpDirectory,
-                                                                     Path geoIpConfigDirectory) throws IOException {
-        assertDatabaseExistence(geoIpDirectory, true);
-        assertDatabaseExistence(geoIpConfigDirectory, false);
-        final boolean loadDatabaseOnHeap = Booleans.parseBoolean(System.getProperty("es.geoip.load_db_on_heap", "false"));
-        final Map<String, DatabaseReaderLazyLoader> databaseReaders = new HashMap<>();
-
-        // load the default databases
-        for (final String databaseFilename : DEFAULT_DATABASE_FILENAMES) {
-            final Path databasePath = geoIpDirectory.resolve(databaseFilename);
-            final DatabaseReaderLazyLoader loader = createLoader(cache, databasePath, loadDatabaseOnHeap);
-            databaseReaders.put(databaseFilename, loader);
-        }
-
-        // load any custom databases
-        if (Files.exists(geoIpConfigDirectory)) {
-            try (Stream<Path> databaseFiles = Files.list(geoIpConfigDirectory)) {
-                PathMatcher pathMatcher = geoIpConfigDirectory.getFileSystem().getPathMatcher("glob:**.mmdb");
-                // Use iterator instead of forEach otherwise IOException needs to be caught twice...
-                Iterator<Path> iterator = databaseFiles.iterator();
-                while (iterator.hasNext()) {
-                    Path databasePath = iterator.next();
-                    if (Files.isRegularFile(databasePath) && pathMatcher.matches(databasePath)) {
-                        String databaseFileName = databasePath.getFileName().toString();
-                        final DatabaseReaderLazyLoader loader = createLoader(cache, databasePath, loadDatabaseOnHeap);
-                        databaseReaders.put(databaseFileName, loader);
-                    }
-                }
-            }
-        }
-        return Collections.unmodifiableMap(databaseReaders);
-    }
-
-    private static DatabaseReaderLazyLoader createLoader(GeoIpCache cache, Path databasePath, boolean loadDatabaseOnHeap) {
-        CheckedSupplier<DatabaseReader, IOException> loader = () -> {
-            DatabaseReader.Builder builder = createDatabaseBuilder(databasePath).withCache(NoCache.getInstance());
-            if (loadDatabaseOnHeap) {
-                builder.fileMode(Reader.FileMode.MEMORY);
-            } else {
-                builder.fileMode(Reader.FileMode.MEMORY_MAPPED);
-            }
-            return builder.build();
-        };
-        return new DatabaseReaderLazyLoader(cache, databasePath, loader);
-    }
-
-    private static void assertDatabaseExistence(final Path path, final boolean exists) throws IOException {
-        for (final String database : DEFAULT_DATABASE_FILENAMES) {
-            if (Files.exists(path.resolve(database)) != exists) {
-                final String message = "expected database [" + database + "] to " + (exists ? "" : "not ") + "exist in [" + path + "]";
-                throw new IOException(message);
-            }
+    @Override
+    public Collection<Object> createComponents(Client client,
+                                               ClusterService clusterService,
+                                               ThreadPool threadPool,
+                                               ResourceWatcherService resourceWatcherService,
+                                               ScriptService scriptService,
+                                               NamedXContentRegistry xContentRegistry,
+                                               Environment environment,
+                                               NodeEnvironment nodeEnvironment,
+                                               NamedWriteableRegistry namedWriteableRegistry,
+                                               IndexNameExpressionResolver indexNameExpressionResolver,
+                                               Supplier<RepositoriesService> repositoriesServiceSupplier) {
+        try {
+            localDatabases.get().initialize(resourceWatcherService);
+        } catch (IOException e) {
+            throw new UncheckedIOException(e);
         }
-    }
-
-    @SuppressForbidden(reason = "Maxmind API requires java.io.File")
-    private static DatabaseReader.Builder createDatabaseBuilder(Path databasePath) {
-        return new DatabaseReader.Builder(databasePath.toFile());
+        return List.of();
     }
 
     @Override
     public void close() throws IOException {
-        if (databaseReaders != null) {
-            IOUtils.close(databaseReaders.values());
-        }
+        localDatabases.get().close();
     }
 
 }

+ 212 - 0
modules/ingest-geoip/src/main/java/org/elasticsearch/ingest/geoip/LocalDatabases.java

@@ -0,0 +1,212 @@
+/*
+ * Copyright Elasticsearch B.V. and/or licensed to Elasticsearch B.V. under one
+ * or more contributor license agreements. Licensed under the Elastic License
+ * 2.0 and the Server Side Public License, v 1; you may not use this file except
+ * in compliance with, at your election, the Elastic License 2.0 or the Server
+ * Side Public License, v 1.
+ */
+package org.elasticsearch.ingest.geoip;
+
+import org.apache.logging.log4j.LogManager;
+import org.apache.logging.log4j.Logger;
+import org.apache.logging.log4j.message.ParameterizedMessage;
+import org.apache.logging.log4j.util.Supplier;
+import org.elasticsearch.common.SuppressForbidden;
+import org.elasticsearch.common.io.PathUtils;
+import org.elasticsearch.env.Environment;
+import org.elasticsearch.watcher.FileChangesListener;
+import org.elasticsearch.watcher.FileWatcher;
+import org.elasticsearch.watcher.ResourceWatcherService;
+
+import java.io.Closeable;
+import java.io.IOException;
+import java.nio.file.Files;
+import java.nio.file.Path;
+import java.nio.file.PathMatcher;
+import java.util.ArrayList;
+import java.util.Collections;
+import java.util.HashMap;
+import java.util.Iterator;
+import java.util.List;
+import java.util.Map;
+import java.util.concurrent.ConcurrentHashMap;
+import java.util.concurrent.ConcurrentMap;
+import java.util.stream.Stream;
+
+import static org.elasticsearch.ingest.geoip.IngestGeoIpPlugin.DEFAULT_DATABASE_FILENAMES;
+
+/**
+ * Keeps track of the databases locally available to a node:
+ * 1) Default databases shipped with the default distribution via ingest-geoip module
+ * 2) User provided databases from the ES_HOME/config/ingest-geoip directory. This directory is monitored
+ *    and files updates are picked up and may cause databases being loaded or removed at runtime.
+ */
+final class LocalDatabases implements Closeable {
+
+    private static final Logger LOGGER = LogManager.getLogger(LocalDatabases.class);
+
+    private final GeoIpCache cache;
+    private final Path geoipConfigDir;
+
+    private final Map<String, DatabaseReaderLazyLoader> defaultDatabases;
+    final ConcurrentMap<String, DatabaseReaderLazyLoader> configDatabases;
+
+    LocalDatabases(Environment environment, GeoIpCache cache) {
+        this(
+            // In GeoIpProcessorNonIngestNodeTests, ingest-geoip is loaded on the classpath.
+            // This means that the plugin is never unbundled into a directory where the database files would live.
+            // Therefore, we have to copy these database files ourselves. To do this, we need the ability to specify where
+            // those database files would go. We do this by adding a plugin that registers ingest.geoip.database_path as an
+            // actual setting. Otherwise, in production code, this setting is not registered and the database path is not configurable.
+            environment.settings().get("ingest.geoip.database_path") != null ?
+                getGeoipConfigDirectory(environment) :
+                environment.modulesFile().resolve("ingest-geoip"),
+            environment.configFile().resolve("ingest-geoip"),
+            cache);
+    }
+
+    LocalDatabases(Path geoipModuleDir, Path geoipConfigDir, GeoIpCache cache) {
+        this.cache = cache;
+        this.geoipConfigDir = geoipConfigDir;
+        this.configDatabases = new ConcurrentHashMap<>();
+        this.defaultDatabases = initDefaultDatabases(geoipModuleDir);
+    }
+
+    void initialize(ResourceWatcherService resourceWatcher) throws IOException {
+        configDatabases.putAll(initConfigDatabases(geoipConfigDir));
+
+        FileWatcher watcher = new FileWatcher(geoipConfigDir);
+        watcher.addListener(new GeoipDirectoryListener());
+        resourceWatcher.add(watcher, ResourceWatcherService.Frequency.HIGH);
+
+        LOGGER.info("initialized default databases [{}], config databases [{}] and watching [{}] for changes",
+            defaultDatabases.keySet(), configDatabases.keySet(), geoipConfigDir);
+    }
+
+    // There is a need for reference counting in order to avoid using an instance
+    // that gets closed while using it. (this can happen during a database update)
+    DatabaseReaderLazyLoader getDatabase(String name) {
+        while (true) {
+            DatabaseReaderLazyLoader instance = configDatabases.getOrDefault(name, defaultDatabases.get(name));
+            if (instance == null) {
+                return null;
+            }
+            if (instance.preLookup()) {
+                return instance;
+            }
+            // instance is closed after incrementing its usage,
+            // drop this instance and fetch another one.
+        }
+    }
+
+    List<DatabaseReaderLazyLoader> getAllDatabases() {
+        List<DatabaseReaderLazyLoader> all = new ArrayList<>(defaultDatabases.values());
+        all.addAll(configDatabases.values());
+        return all;
+    }
+
+    Map<String, DatabaseReaderLazyLoader> getDefaultDatabases() {
+        return defaultDatabases;
+    }
+
+    Map<String, DatabaseReaderLazyLoader> getConfigDatabases() {
+        return configDatabases;
+    }
+
+    void updateDatabase(Path file, boolean update) {
+        String databaseFileName = file.getFileName().toString();
+        try {
+            if (update) {
+                LOGGER.info("database file changed [{}], reload database...", file);
+                DatabaseReaderLazyLoader loader = new DatabaseReaderLazyLoader(cache, file);
+                DatabaseReaderLazyLoader existing = configDatabases.put(databaseFileName, loader);
+                if (existing != null) {
+                    existing.close();
+                }
+            } else {
+                LOGGER.info("database file removed [{}], close database...", file);
+                DatabaseReaderLazyLoader existing = configDatabases.remove(databaseFileName);
+                assert existing != null;
+                existing.close();
+            }
+        } catch (Exception e) {
+            LOGGER.error((Supplier<?>) () -> new ParameterizedMessage("failed to update database [{}]", databaseFileName), e);
+        }
+    }
+
+    Map<String, DatabaseReaderLazyLoader> initDefaultDatabases(Path geoipModuleDir) {
+        Map<String, DatabaseReaderLazyLoader> databases = new HashMap<>(DEFAULT_DATABASE_FILENAMES.length);
+
+        for (String filename : DEFAULT_DATABASE_FILENAMES) {
+            Path source = geoipModuleDir.resolve(filename);
+            assert Files.exists(source);
+            String databaseFileName = source.getFileName().toString();
+            DatabaseReaderLazyLoader loader = new DatabaseReaderLazyLoader(cache, source);
+            databases.put(databaseFileName, loader);
+        }
+
+        return Collections.unmodifiableMap(databases);
+    }
+
+    Map<String, DatabaseReaderLazyLoader> initConfigDatabases(Path geoipConfigDir) throws IOException {
+        Map<String, DatabaseReaderLazyLoader> databases = new HashMap<>();
+
+        if (geoipConfigDir != null && Files.exists(geoipConfigDir)) {
+            try (Stream<Path> databaseFiles = Files.list(geoipConfigDir)) {
+                PathMatcher pathMatcher = geoipConfigDir.getFileSystem().getPathMatcher("glob:**.mmdb");
+                // Use iterator instead of forEach otherwise IOException needs to be caught twice...
+                Iterator<Path> iterator = databaseFiles.iterator();
+                while (iterator.hasNext()) {
+                    Path databasePath = iterator.next();
+                    if (Files.isRegularFile(databasePath) && pathMatcher.matches(databasePath)) {
+                        assert Files.exists(databasePath);
+                        String databaseFileName = databasePath.getFileName().toString();
+                        DatabaseReaderLazyLoader loader = new DatabaseReaderLazyLoader(cache, databasePath);
+                        databases.put(databaseFileName, loader);
+                    }
+                }
+            }
+        }
+
+        return Collections.unmodifiableMap(databases);
+    }
+
+    @Override
+    public void close() throws IOException {
+        for (DatabaseReaderLazyLoader lazyLoader : defaultDatabases.values()) {
+            lazyLoader.close();
+        }
+        for (DatabaseReaderLazyLoader lazyLoader : configDatabases.values()) {
+            lazyLoader.close();
+        }
+    }
+
+    @SuppressForbidden(reason = "PathUtils#get")
+    private static Path getGeoipConfigDirectory(Environment environment) {
+        return PathUtils.get(environment.settings().get("ingest.geoip.database_path"));
+    }
+
+    private class GeoipDirectoryListener implements FileChangesListener {
+
+        @Override
+        public void onFileCreated(Path file) {
+            onFileChanged(file);
+        }
+
+        @Override
+        public void onFileDeleted(Path file) {
+            PathMatcher pathMatcher = file.getFileSystem().getPathMatcher("glob:**.mmdb");
+            if (pathMatcher.matches(file)) {
+                updateDatabase(file, false);
+            }
+        }
+
+        @Override
+        public void onFileChanged(Path file) {
+            PathMatcher pathMatcher = file.getFileSystem().getPathMatcher("glob:**.mmdb");
+            if (pathMatcher.matches(file)) {
+                updateDatabase(file, true);
+            }
+        }
+    }
+}

+ 39 - 65
modules/ingest-geoip/src/test/java/org/elasticsearch/ingest/geoip/GeoIpProcessorFactoryTests.java

@@ -11,10 +11,14 @@ package org.elasticsearch.ingest.geoip;
 import com.carrotsearch.randomizedtesting.generators.RandomPicks;
 import org.elasticsearch.ElasticsearchParseException;
 import org.elasticsearch.common.Randomness;
+import org.elasticsearch.common.settings.Settings;
 import org.elasticsearch.index.VersionType;
 import org.elasticsearch.ingest.IngestDocument;
 import org.elasticsearch.test.ESTestCase;
 import org.elasticsearch.test.StreamsUtils;
+import org.elasticsearch.threadpool.TestThreadPool;
+import org.elasticsearch.threadpool.ThreadPool;
+import org.elasticsearch.watcher.ResourceWatcherService;
 import org.junit.AfterClass;
 import org.junit.BeforeClass;
 
@@ -31,14 +35,12 @@ import java.util.Locale;
 import java.util.Map;
 import java.util.Set;
 
-import static org.hamcrest.Matchers.containsString;
 import static org.hamcrest.Matchers.equalTo;
-import static org.hamcrest.Matchers.hasToString;
 import static org.hamcrest.Matchers.sameInstance;
 
 public class GeoIpProcessorFactoryTests extends ESTestCase {
 
-    private static Map<String, DatabaseReaderLazyLoader> databaseReaders;
+    private static LocalDatabases localDatabases;
 
     @BeforeClass
     public static void loadDatabaseReaders() throws IOException {
@@ -47,20 +49,17 @@ public class GeoIpProcessorFactoryTests extends ESTestCase {
         final Path geoIpConfigDir = configDir.resolve("ingest-geoip");
         Files.createDirectories(geoIpConfigDir);
         copyDatabaseFiles(geoIpDir);
-
-        databaseReaders = IngestGeoIpPlugin.loadDatabaseReaders(new GeoIpCache(1000), geoIpDir, geoIpConfigDir);
+        localDatabases = new LocalDatabases(geoIpDir, geoIpConfigDir, new GeoIpCache(1000));
     }
 
     @AfterClass
     public static void closeDatabaseReaders() throws IOException {
-        for (DatabaseReaderLazyLoader reader : databaseReaders.values()) {
-            reader.close();
-        }
-        databaseReaders = null;
+        localDatabases.close();
+        localDatabases = null;
     }
 
     public void testBuildDefaults() throws Exception {
-        GeoIpProcessor.Factory factory = new GeoIpProcessor.Factory(databaseReaders);
+        GeoIpProcessor.Factory factory = new GeoIpProcessor.Factory(localDatabases);
 
         Map<String, Object> config = new HashMap<>();
         config.put("field", "_field");
@@ -76,7 +75,7 @@ public class GeoIpProcessorFactoryTests extends ESTestCase {
     }
 
     public void testSetIgnoreMissing() throws Exception {
-        GeoIpProcessor.Factory factory = new GeoIpProcessor.Factory(databaseReaders);
+        GeoIpProcessor.Factory factory = new GeoIpProcessor.Factory(localDatabases);
 
         Map<String, Object> config = new HashMap<>();
         config.put("field", "_field");
@@ -93,7 +92,7 @@ public class GeoIpProcessorFactoryTests extends ESTestCase {
     }
 
     public void testCountryBuildDefaults() throws Exception {
-        GeoIpProcessor.Factory factory = new GeoIpProcessor.Factory(databaseReaders);
+        GeoIpProcessor.Factory factory = new GeoIpProcessor.Factory(localDatabases);
 
         Map<String, Object> config = new HashMap<>();
         config.put("field", "_field");
@@ -111,7 +110,7 @@ public class GeoIpProcessorFactoryTests extends ESTestCase {
     }
 
     public void testAsnBuildDefaults() throws Exception {
-        GeoIpProcessor.Factory factory = new GeoIpProcessor.Factory(databaseReaders);
+        GeoIpProcessor.Factory factory = new GeoIpProcessor.Factory(localDatabases);
 
         Map<String, Object> config = new HashMap<>();
         config.put("field", "_field");
@@ -129,7 +128,7 @@ public class GeoIpProcessorFactoryTests extends ESTestCase {
     }
 
     public void testBuildTargetField() throws Exception {
-        GeoIpProcessor.Factory factory = new GeoIpProcessor.Factory(databaseReaders);
+        GeoIpProcessor.Factory factory = new GeoIpProcessor.Factory(localDatabases);
         Map<String, Object> config = new HashMap<>();
         config.put("field", "_field");
         config.put("target_field", "_field");
@@ -140,7 +139,7 @@ public class GeoIpProcessorFactoryTests extends ESTestCase {
     }
 
     public void testBuildDbFile() throws Exception {
-        GeoIpProcessor.Factory factory = new GeoIpProcessor.Factory(databaseReaders);
+        GeoIpProcessor.Factory factory = new GeoIpProcessor.Factory(localDatabases);
         Map<String, Object> config = new HashMap<>();
         config.put("field", "_field");
         config.put("database_file", "GeoLite2-Country.mmdb");
@@ -153,7 +152,7 @@ public class GeoIpProcessorFactoryTests extends ESTestCase {
     }
 
     public void testBuildWithCountryDbAndAsnFields() throws Exception {
-        GeoIpProcessor.Factory factory = new GeoIpProcessor.Factory(databaseReaders);
+        GeoIpProcessor.Factory factory = new GeoIpProcessor.Factory(localDatabases);
         Map<String, Object> config = new HashMap<>();
         config.put("field", "_field");
         config.put("database_file", "GeoLite2-Country.mmdb");
@@ -167,7 +166,7 @@ public class GeoIpProcessorFactoryTests extends ESTestCase {
     }
 
     public void testBuildWithAsnDbAndCityFields() throws Exception {
-        GeoIpProcessor.Factory factory = new GeoIpProcessor.Factory(databaseReaders);
+        GeoIpProcessor.Factory factory = new GeoIpProcessor.Factory(localDatabases);
         Map<String, Object> config = new HashMap<>();
         config.put("field", "_field");
         config.put("database_file", "GeoLite2-ASN.mmdb");
@@ -181,7 +180,7 @@ public class GeoIpProcessorFactoryTests extends ESTestCase {
     }
 
     public void testBuildNonExistingDbFile() throws Exception {
-        GeoIpProcessor.Factory factory = new GeoIpProcessor.Factory(databaseReaders);
+        GeoIpProcessor.Factory factory = new GeoIpProcessor.Factory(localDatabases);
 
         Map<String, Object> config = new HashMap<>();
         config.put("field", "_field");
@@ -191,7 +190,7 @@ public class GeoIpProcessorFactoryTests extends ESTestCase {
     }
 
     public void testBuildFields() throws Exception {
-        GeoIpProcessor.Factory factory = new GeoIpProcessor.Factory(databaseReaders);
+        GeoIpProcessor.Factory factory = new GeoIpProcessor.Factory(localDatabases);
 
         Set<GeoIpProcessor.Property> properties = EnumSet.noneOf(GeoIpProcessor.Property.class);
         List<String> fieldNames = new ArrayList<>();
@@ -215,7 +214,7 @@ public class GeoIpProcessorFactoryTests extends ESTestCase {
     }
 
     public void testBuildIllegalFieldOption() throws Exception {
-        GeoIpProcessor.Factory factory = new GeoIpProcessor.Factory(databaseReaders);
+        GeoIpProcessor.Factory factory = new GeoIpProcessor.Factory(localDatabases);
 
         Map<String, Object> config1 = new HashMap<>();
         config1.put("field", "_field");
@@ -241,10 +240,9 @@ public class GeoIpProcessorFactoryTests extends ESTestCase {
         // Loading another database reader instances, because otherwise we can't test lazy loading as the
         // database readers used at class level are reused between tests. (we want to keep that otherwise running this
         // test will take roughly 4 times more time)
-        Map<String, DatabaseReaderLazyLoader> databaseReaders =
-            IngestGeoIpPlugin.loadDatabaseReaders(new GeoIpCache(1000), geoIpDir, geoIpConfigDir);
-        GeoIpProcessor.Factory factory = new GeoIpProcessor.Factory(databaseReaders);
-        for (DatabaseReaderLazyLoader lazyLoader : databaseReaders.values()) {
+        LocalDatabases localDatabases = new LocalDatabases(geoIpDir, geoIpConfigDir, new GeoIpCache(1000));
+        GeoIpProcessor.Factory factory = new GeoIpProcessor.Factory(localDatabases);
+        for (DatabaseReaderLazyLoader lazyLoader : localDatabases.getAllDatabases()) {
             assertNull(lazyLoader.databaseReader.get());
         }
 
@@ -257,10 +255,10 @@ public class GeoIpProcessorFactoryTests extends ESTestCase {
         final GeoIpProcessor city = factory.create(null, "_tag", null, config);
 
         // these are lazy loaded until first use so we expect null here
-        assertNull(databaseReaders.get("GeoLite2-City.mmdb").databaseReader.get());
+        assertNull(localDatabases.getDatabase("GeoLite2-City.mmdb").databaseReader.get());
         city.execute(document);
         // the first ingest should trigger a database load
-        assertNotNull(databaseReaders.get("GeoLite2-City.mmdb").databaseReader.get());
+        assertNotNull(localDatabases.getDatabase("GeoLite2-City.mmdb").databaseReader.get());
 
         config = new HashMap<>();
         config.put("field", "_field");
@@ -268,10 +266,10 @@ public class GeoIpProcessorFactoryTests extends ESTestCase {
         final GeoIpProcessor country = factory.create(null, "_tag", null, config);
 
         // these are lazy loaded until first use so we expect null here
-        assertNull(databaseReaders.get("GeoLite2-Country.mmdb").databaseReader.get());
+        assertNull(localDatabases.getDatabase("GeoLite2-Country.mmdb").databaseReader.get());
         country.execute(document);
         // the first ingest should trigger a database load
-        assertNotNull(databaseReaders.get("GeoLite2-Country.mmdb").databaseReader.get());
+        assertNotNull(localDatabases.getDatabase("GeoLite2-Country.mmdb").databaseReader.get());
 
         config = new HashMap<>();
         config.put("field", "_field");
@@ -279,10 +277,10 @@ public class GeoIpProcessorFactoryTests extends ESTestCase {
         final GeoIpProcessor asn = factory.create(null, "_tag", null, config);
 
         // these are lazy loaded until first use so we expect null here
-        assertNull(databaseReaders.get("GeoLite2-ASN.mmdb").databaseReader.get());
+        assertNull(localDatabases.getDatabase("GeoLite2-ASN.mmdb").databaseReader.get());
         asn.execute(document);
         // the first ingest should trigger a database load
-        assertNotNull(databaseReaders.get("GeoLite2-ASN.mmdb").databaseReader.get());
+        assertNotNull(localDatabases.getDatabase("GeoLite2-ASN.mmdb").databaseReader.get());
     }
 
     public void testLoadingCustomDatabase() throws IOException {
@@ -299,10 +297,12 @@ public class GeoIpProcessorFactoryTests extends ESTestCase {
          * Loading another database reader instances, because otherwise we can't test lazy loading as the database readers used at class
          * level are reused between tests. (we want to keep that otherwise running this test will take roughly 4 times more time).
          */
-        final Map<String, DatabaseReaderLazyLoader> databaseReaders =
-            IngestGeoIpPlugin.loadDatabaseReaders(new GeoIpCache(1000), geoIpDir, geoIpConfigDir);
-        final GeoIpProcessor.Factory factory = new GeoIpProcessor.Factory(databaseReaders);
-        for (DatabaseReaderLazyLoader lazyLoader : databaseReaders.values()) {
+        ThreadPool threadPool = new TestThreadPool("test");
+        ResourceWatcherService resourceWatcherService = new ResourceWatcherService(Settings.EMPTY, threadPool);
+        LocalDatabases localDatabases = new LocalDatabases(geoIpDir, geoIpConfigDir, new GeoIpCache(1000));
+        localDatabases.initialize(resourceWatcherService);
+        GeoIpProcessor.Factory factory = new GeoIpProcessor.Factory(localDatabases);
+        for (DatabaseReaderLazyLoader lazyLoader : localDatabases.getAllDatabases()) {
             assertNull(lazyLoader.databaseReader.get());
         }
 
@@ -315,38 +315,12 @@ public class GeoIpProcessorFactoryTests extends ESTestCase {
         final GeoIpProcessor city = factory.create(null, "_tag", null, config);
 
         // these are lazy loaded until first use so we expect null here
-        assertNull(databaseReaders.get("GeoIP2-City.mmdb").databaseReader.get());
+        assertNull(localDatabases.getDatabase("GeoIP2-City.mmdb").databaseReader.get());
         city.execute(document);
         // the first ingest should trigger a database load
-        assertNotNull(databaseReaders.get("GeoIP2-City.mmdb").databaseReader.get());
-    }
-
-    public void testDatabaseNotExistsInDir() throws IOException {
-        final Path geoIpDir = createTempDir();
-        final Path configDir = createTempDir();
-        final Path geoIpConfigDir = configDir.resolve("ingest-geoip");
-        if (randomBoolean()) {
-            Files.createDirectories(geoIpConfigDir);
-        }
-        copyDatabaseFiles(geoIpDir);
-        final String databaseFilename = randomFrom(IngestGeoIpPlugin.DEFAULT_DATABASE_FILENAMES);
-        Files.delete(geoIpDir.resolve(databaseFilename));
-        final IOException e = expectThrows(IOException.class,
-            () -> IngestGeoIpPlugin.loadDatabaseReaders(new GeoIpCache(1000), geoIpDir, geoIpConfigDir));
-        assertThat(e, hasToString(containsString("expected database [" + databaseFilename + "] to exist in [" + geoIpDir + "]")));
-    }
-
-    public void testDatabaseExistsInConfigDir() throws IOException {
-        final Path geoIpDir = createTempDir();
-        final Path configDir = createTempDir();
-        final Path geoIpConfigDir = configDir.resolve("ingest-geoip");
-        Files.createDirectories(geoIpConfigDir);
-        copyDatabaseFiles(geoIpDir);
-        final String databaseFilename = randomFrom(IngestGeoIpPlugin.DEFAULT_DATABASE_FILENAMES);
-        copyDatabaseFile(geoIpConfigDir, databaseFilename);
-        final IOException e = expectThrows(IOException.class,
-            () -> IngestGeoIpPlugin.loadDatabaseReaders(new GeoIpCache(1000), geoIpDir, geoIpConfigDir));
-        assertThat(e, hasToString(containsString("expected database [" + databaseFilename + "] to not exist in [" + geoIpConfigDir + "]")));
+        assertNotNull(localDatabases.getDatabase("GeoIP2-City.mmdb").databaseReader.get());
+        resourceWatcherService.close();
+        threadPool.shutdown();
     }
 
     private static void copyDatabaseFile(final Path path, final String databaseFilename) throws IOException {
@@ -355,7 +329,7 @@ public class GeoIpProcessorFactoryTests extends ESTestCase {
                 path.resolve(databaseFilename));
     }
 
-    private static void copyDatabaseFiles(final Path path) throws IOException {
+    static void copyDatabaseFiles(final Path path) throws IOException {
         for (final String databaseFilename : IngestGeoIpPlugin.DEFAULT_DATABASE_FILENAMES) {
             copyDatabaseFile(path, databaseFilename);
         }

+ 5 - 3
modules/ingest-geoip/src/test/java/org/elasticsearch/ingest/geoip/GeoIpProcessorTests.java

@@ -299,11 +299,12 @@ public class GeoIpProcessorTests extends ESTestCase {
         assertThat(ingestDocument.getSourceAndMetadata().containsKey("target_field"), is(false));
     }
 
-    private DatabaseReaderLazyLoader loader(final String path) {
+    private CheckedSupplier<DatabaseReaderLazyLoader, IOException> loader(final String path) {
         final Supplier<InputStream> databaseInputStreamSupplier = () -> GeoIpProcessor.class.getResourceAsStream(path);
         final CheckedSupplier<DatabaseReader, IOException> loader =
-                () -> new DatabaseReader.Builder(databaseInputStreamSupplier.get()).build();
-        return new DatabaseReaderLazyLoader(new GeoIpCache(1000), PathUtils.get(path), loader) {
+            () -> new DatabaseReader.Builder(databaseInputStreamSupplier.get()).build();
+        final GeoIpCache cache = new GeoIpCache(1000);
+        DatabaseReaderLazyLoader lazyLoader = new DatabaseReaderLazyLoader(cache, PathUtils.get(path), loader) {
 
             @Override
             long databaseFileSize() throws IOException {
@@ -325,6 +326,7 @@ public class GeoIpProcessorTests extends ESTestCase {
             }
 
         };
+        return () -> lazyLoader;
     }
 
 }

+ 170 - 0
modules/ingest-geoip/src/test/java/org/elasticsearch/ingest/geoip/LocalDatabasesTests.java

@@ -0,0 +1,170 @@
+/*
+ * Copyright Elasticsearch B.V. and/or licensed to Elasticsearch B.V. under one
+ * or more contributor license agreements. Licensed under the Elastic License
+ * 2.0 and the Server Side Public License, v 1; you may not use this file except
+ * in compliance with, at your election, the Elastic License 2.0 or the Server
+ * Side Public License, v 1.
+ */
+
+package org.elasticsearch.ingest.geoip;
+
+import com.maxmind.geoip2.model.CityResponse;
+import org.elasticsearch.common.network.InetAddresses;
+import org.elasticsearch.common.settings.Settings;
+import org.elasticsearch.test.ESTestCase;
+import org.elasticsearch.threadpool.TestThreadPool;
+import org.elasticsearch.threadpool.ThreadPool;
+import org.elasticsearch.watcher.ResourceWatcherService;
+import org.junit.After;
+import org.junit.Before;
+
+import java.io.IOException;
+import java.nio.file.Files;
+import java.nio.file.Path;
+import java.nio.file.StandardCopyOption;
+
+import static org.hamcrest.Matchers.equalTo;
+
+public class LocalDatabasesTests extends ESTestCase {
+
+    private ThreadPool threadPool;
+    private ResourceWatcherService resourceWatcherService;
+
+    @Before
+    public void setup() {
+        threadPool = new TestThreadPool(LocalDatabases.class.getSimpleName());
+        resourceWatcherService = new ResourceWatcherService(Settings.EMPTY, threadPool);
+    }
+
+    @After
+    public void cleanup() {
+        resourceWatcherService.close();
+        threadPool.shutdownNow();
+    }
+
+    public void testLocalDatabasesEmptyConfig() throws Exception {
+        Path configDir = createTempDir();
+        LocalDatabases localDatabases = new LocalDatabases(prepareModuleDir(), configDir, new GeoIpCache(0));
+        localDatabases.initialize(resourceWatcherService);
+
+        assertThat(localDatabases.getDefaultDatabases().size(), equalTo(3));
+        assertThat(localDatabases.getConfigDatabases().size(), equalTo(0));
+        DatabaseReaderLazyLoader loader = localDatabases.getDatabase("GeoLite2-ASN.mmdb");
+        assertThat(loader.getDatabaseType(), equalTo("GeoLite2-ASN"));
+
+        loader = localDatabases.getDatabase("GeoLite2-City.mmdb");
+        assertThat(loader.getDatabaseType(), equalTo("GeoLite2-City"));
+
+        loader = localDatabases.getDatabase("GeoLite2-Country.mmdb");
+        assertThat(loader.getDatabaseType(), equalTo("GeoLite2-Country"));
+    }
+
+    public void testDatabasesConfigDir() throws Exception {
+        Path configDir = createTempDir();
+        Files.copy(LocalDatabases.class.getResourceAsStream("/GeoIP2-City-Test.mmdb"), configDir.resolve("GeoIP2-City.mmdb"));
+        Files.copy(LocalDatabases.class.getResourceAsStream("/GeoLite2-City-Test.mmdb"), configDir.resolve("GeoLite2-City.mmdb"));
+
+        LocalDatabases localDatabases = new LocalDatabases(prepareModuleDir(), configDir, new GeoIpCache(0));
+        localDatabases.initialize(resourceWatcherService);
+
+        assertThat(localDatabases.getDefaultDatabases().size(), equalTo(3));
+        assertThat(localDatabases.getConfigDatabases().size(), equalTo(2));
+        DatabaseReaderLazyLoader loader = localDatabases.getDatabase("GeoLite2-ASN.mmdb");
+        assertThat(loader.getDatabaseType(), equalTo("GeoLite2-ASN"));
+
+        loader = localDatabases.getDatabase("GeoLite2-City.mmdb");
+        assertThat(loader.getDatabaseType(), equalTo("GeoLite2-City"));
+
+        loader = localDatabases.getDatabase("GeoLite2-Country.mmdb");
+        assertThat(loader.getDatabaseType(), equalTo("GeoLite2-Country"));
+
+        loader = localDatabases.getDatabase("GeoIP2-City.mmdb");
+        assertThat(loader.getDatabaseType(), equalTo("GeoIP2-City"));
+    }
+
+    public void testDatabasesDynamicUpdateConfigDir() throws Exception {
+        Path configDir = createTempDir();
+        LocalDatabases localDatabases = new LocalDatabases(prepareModuleDir(), configDir, new GeoIpCache(0));
+        localDatabases.initialize(resourceWatcherService);
+        {
+            assertThat(localDatabases.getDefaultDatabases().size(), equalTo(3));
+            DatabaseReaderLazyLoader loader = localDatabases.getDatabase("GeoLite2-ASN.mmdb");
+            assertThat(loader.getDatabaseType(), equalTo("GeoLite2-ASN"));
+
+            loader = localDatabases.getDatabase("GeoLite2-City.mmdb");
+            assertThat(loader.getDatabaseType(), equalTo("GeoLite2-City"));
+
+            loader = localDatabases.getDatabase("GeoLite2-Country.mmdb");
+            assertThat(loader.getDatabaseType(), equalTo("GeoLite2-Country"));
+        }
+
+        Files.copy(LocalDatabases.class.getResourceAsStream("/GeoIP2-City-Test.mmdb"), configDir.resolve("GeoIP2-City.mmdb"));
+        Files.copy(LocalDatabases.class.getResourceAsStream("/GeoLite2-City-Test.mmdb"), configDir.resolve("GeoLite2-City.mmdb"));
+        assertBusy(() -> {
+            assertThat(localDatabases.getDefaultDatabases().size(), equalTo(3));
+            assertThat(localDatabases.getConfigDatabases().size(), equalTo(2));
+            DatabaseReaderLazyLoader loader = localDatabases.getDatabase("GeoLite2-ASN.mmdb");
+            assertThat(loader.getDatabaseType(), equalTo("GeoLite2-ASN"));
+
+            loader = localDatabases.getDatabase("GeoLite2-City.mmdb");
+            assertThat(loader.getDatabaseType(), equalTo("GeoLite2-City"));
+
+            loader = localDatabases.getDatabase("GeoLite2-Country.mmdb");
+            assertThat(loader.getDatabaseType(), equalTo("GeoLite2-Country"));
+
+            loader = localDatabases.getDatabase("GeoIP2-City.mmdb");
+            assertThat(loader.getDatabaseType(), equalTo("GeoIP2-City"));
+        });
+    }
+
+    public void testDatabasesUpdateExistingConfDatabase() throws Exception {
+        Path configDir = createTempDir();
+        Files.copy(LocalDatabases.class.getResourceAsStream("/GeoLite2-City.mmdb"), configDir.resolve("GeoLite2-City.mmdb"));
+        GeoIpCache cache = new GeoIpCache(1000); // real cache to test purging of entries upon a reload
+        LocalDatabases localDatabases = new LocalDatabases(prepareModuleDir(), configDir, cache);
+        localDatabases.initialize(resourceWatcherService);
+        {
+            assertThat(cache.count(), equalTo(0));
+            assertThat(localDatabases.getDefaultDatabases().size(), equalTo(3));
+            assertThat(localDatabases.getConfigDatabases().size(), equalTo(1));
+
+            DatabaseReaderLazyLoader loader = localDatabases.getDatabase("GeoLite2-City.mmdb");
+            assertThat(loader.getDatabaseType(), equalTo("GeoLite2-City"));
+            CityResponse cityResponse = loader.getCity(InetAddresses.forString("89.160.20.128"));
+            assertThat(cityResponse.getCity().getName(), equalTo("Tumba"));
+            assertThat(cache.count(), equalTo(1));
+            loader.postLookup();
+        }
+
+        Files.copy(LocalDatabases.class.getResourceAsStream("/GeoLite2-City-Test.mmdb"), configDir.resolve("GeoLite2-City.mmdb"),
+            StandardCopyOption.REPLACE_EXISTING);
+        assertBusy(() -> {
+            assertThat(localDatabases.getDefaultDatabases().size(), equalTo(3));
+            assertThat(localDatabases.getConfigDatabases().size(), equalTo(1));
+            assertThat(cache.count(), equalTo(0));
+
+            DatabaseReaderLazyLoader loader = localDatabases.getDatabase("GeoLite2-City.mmdb");
+            assertThat(loader.getDatabaseType(), equalTo("GeoLite2-City"));
+            CityResponse cityResponse = loader.getCity(InetAddresses.forString("89.160.20.128"));
+            assertThat(cityResponse.getCity().getName(), equalTo("Linköping"));
+            assertThat(cache.count(), equalTo(1));
+            loader.postLookup();
+        });
+
+        Files.delete(configDir.resolve("GeoLite2-City.mmdb"));
+        assertBusy(() -> {
+            assertThat(localDatabases.getDefaultDatabases().size(), equalTo(3));
+            assertThat(localDatabases.getConfigDatabases().size(), equalTo(0));
+            assertThat(cache.count(), equalTo(0));
+        });
+    }
+
+    private static Path prepareModuleDir() throws IOException {
+        Path dir = createTempDir();
+        Files.copy(LocalDatabases.class.getResourceAsStream("/GeoLite2-ASN.mmdb"), dir.resolve("GeoLite2-ASN.mmdb"));
+        Files.copy(LocalDatabases.class.getResourceAsStream("/GeoLite2-City.mmdb"), dir.resolve("GeoLite2-City.mmdb"));
+        Files.copy(LocalDatabases.class.getResourceAsStream("/GeoLite2-Country.mmdb"), dir.resolve("GeoLite2-Country.mmdb"));
+        return dir;
+    }
+
+}

BIN
modules/ingest-geoip/src/test/resources/GeoIP2-City-Test.mmdb


BIN
modules/ingest-geoip/src/test/resources/GeoLite2-City-Test.mmdb