Browse Source

Rework close and shutdown for the geoip processor (#113138) (#113147)

Joe Gallo 1 year ago
parent
commit
5d06f29675

+ 3 - 3
modules/ingest-geoip/src/main/java/org/elasticsearch/ingest/geoip/ConfigDatabases.java

@@ -77,13 +77,13 @@ final class ConfigDatabases implements Closeable {
                 DatabaseReaderLazyLoader loader = new DatabaseReaderLazyLoader(cache, file, null);
                 DatabaseReaderLazyLoader existing = configDatabases.put(databaseFileName, loader);
                 if (existing != null) {
-                    existing.close();
+                    existing.shutdown();
                 }
             } else {
                 logger.info("database file removed [{}], close database...", file);
                 DatabaseReaderLazyLoader existing = configDatabases.remove(databaseFileName);
                 assert existing != null;
-                existing.close();
+                existing.shutdown();
             }
         } catch (Exception e) {
             logger.error(() -> "failed to update database [" + databaseFileName + "]", e);
@@ -116,7 +116,7 @@ final class ConfigDatabases implements Closeable {
     @Override
     public void close() throws IOException {
         for (DatabaseReaderLazyLoader lazyLoader : configDatabases.values()) {
-            lazyLoader.close();
+            lazyLoader.shutdown();
         }
     }
 

+ 18 - 6
modules/ingest-geoip/src/main/java/org/elasticsearch/ingest/geoip/DatabaseNodeService.java

@@ -86,7 +86,7 @@ import static org.elasticsearch.ingest.geoip.GeoIpTaskState.getGeoIpTaskState;
  * if there is an old instance of this database then that is closed.
  * 4) Cleanup locally loaded databases that are no longer mentioned in {@link GeoIpTaskState}.
  */
-public final class DatabaseNodeService implements IpDatabaseProvider, Closeable {
+public final class DatabaseNodeService implements IpDatabaseProvider {
 
     private static final Logger logger = LogManager.getLogger(DatabaseNodeService.class);
 
@@ -235,9 +235,21 @@ public final class DatabaseNodeService implements IpDatabaseProvider, Closeable
         return databases.get(key);
     }
 
-    @Override
-    public void close() throws IOException {
-        IOUtils.close(databases.values());
+    public void shutdown() throws IOException {
+        // this is a little 'fun' looking, but it's just adapting IOUtils.close() into something
+        // that can call a bunch of shutdown methods (rather than close methods)
+        final var loadersToShutdown = databases.values().stream().map(ShutdownCloseable::new).toList();
+        databases.clear();
+        IOUtils.close(loadersToShutdown);
+    }
+
+    private record ShutdownCloseable(DatabaseReaderLazyLoader loader) implements Closeable {
+        @Override
+        public void close() throws IOException {
+            if (loader != null) {
+                loader.shutdown();
+            }
+        }
     }
 
     void checkDatabases(ClusterState state) {
@@ -420,7 +432,7 @@ public final class DatabaseNodeService implements IpDatabaseProvider, Closeable
             DatabaseReaderLazyLoader loader = new DatabaseReaderLazyLoader(cache, file, recordedMd5);
             DatabaseReaderLazyLoader existing = databases.put(databaseFileName, loader);
             if (existing != null) {
-                existing.close();
+                existing.shutdown();
             } else {
                 // Loaded a database for the first time, so reload pipelines for which a database was not available:
                 Predicate<GeoIpProcessor.DatabaseUnavailableProcessor> predicate = p -> databaseFileName.equals(p.getDatabaseName());
@@ -458,7 +470,7 @@ public final class DatabaseNodeService implements IpDatabaseProvider, Closeable
                 logger.debug("database [{}] no longer exists, cleaning up...", staleEntry);
                 DatabaseReaderLazyLoader existing = databases.remove(staleEntry);
                 assert existing != null;
-                existing.close(true);
+                existing.shutdown(true);
             } catch (Exception e) {
                 logger.error(() -> "failed to clean database [" + staleEntry + "]", e);
             }

+ 13 - 16
modules/ingest-geoip/src/main/java/org/elasticsearch/ingest/geoip/DatabaseReaderLazyLoader.java

@@ -13,7 +13,6 @@ import com.maxmind.db.DatabaseRecord;
 import com.maxmind.db.Network;
 import com.maxmind.db.NoCache;
 import com.maxmind.db.Reader;
-import com.maxmind.geoip2.model.AbstractResponse;
 import com.maxmind.geoip2.model.AnonymousIpResponse;
 import com.maxmind.geoip2.model.AsnResponse;
 import com.maxmind.geoip2.model.CityResponse;
@@ -36,7 +35,6 @@ import org.elasticsearch.core.IOUtils;
 import org.elasticsearch.core.Nullable;
 import org.elasticsearch.core.SuppressForbidden;
 
-import java.io.Closeable;
 import java.io.File;
 import java.io.IOException;
 import java.net.InetAddress;
@@ -51,7 +49,7 @@ import java.util.concurrent.atomic.AtomicInteger;
  * Facilitates lazy loading of the database reader, so that when the geoip plugin is installed, but not used,
  * no memory is being wasted on the database reader.
  */
-class DatabaseReaderLazyLoader implements IpDatabase, Closeable {
+class DatabaseReaderLazyLoader implements IpDatabase {
 
     private static final boolean LOAD_DATABASE_ON_HEAP = Booleans.parseBoolean(System.getProperty("es.geoip.load_db_on_heap", "false"));
 
@@ -66,7 +64,7 @@ class DatabaseReaderLazyLoader implements IpDatabase, Closeable {
     // cache the database type so that we do not re-read it on every pipeline execution
     final SetOnce<String> databaseType;
 
-    private volatile boolean deleteDatabaseFileOnClose;
+    private volatile boolean deleteDatabaseFileOnShutdown;
     private final AtomicInteger currentUsages = new AtomicInteger(0);
 
     DatabaseReaderLazyLoader(GeoIpCache cache, Path databasePath, String md5) {
@@ -189,9 +187,9 @@ class DatabaseReaderLazyLoader implements IpDatabase, Closeable {
     }
 
     @Override
-    public void release() throws IOException {
+    public void close() throws IOException {
         if (currentUsages.updateAndGet(current -> current > 0 ? current - 1 : current + 1) == -1) {
-            doClose();
+            doShutdown();
         }
     }
 
@@ -200,9 +198,9 @@ class DatabaseReaderLazyLoader implements IpDatabase, Closeable {
     }
 
     @Nullable
-    private <T extends AbstractResponse> T getResponse(
+    private <RESPONSE> RESPONSE getResponse(
         String ipAddress,
-        CheckedBiFunction<Reader, String, Optional<T>, Exception> responseProvider
+        CheckedBiFunction<Reader, String, Optional<RESPONSE>, Exception> responseProvider
     ) {
         return cache.putIfAbsent(ipAddress, databasePath.toString(), ip -> {
             try {
@@ -229,24 +227,23 @@ class DatabaseReaderLazyLoader implements IpDatabase, Closeable {
         return md5;
     }
 
-    public void close(boolean shouldDeleteDatabaseFileOnClose) throws IOException {
-        this.deleteDatabaseFileOnClose = shouldDeleteDatabaseFileOnClose;
-        close();
+    public void shutdown(boolean shouldDeleteDatabaseFileOnShutdown) throws IOException {
+        this.deleteDatabaseFileOnShutdown = shouldDeleteDatabaseFileOnShutdown;
+        shutdown();
     }
 
-    @Override
-    public void close() throws IOException {
+    public void shutdown() throws IOException {
         if (currentUsages.updateAndGet(u -> -1 - u) == -1) {
-            doClose();
+            doShutdown();
         }
     }
 
     // Visible for Testing
-    protected void doClose() throws IOException {
+    protected void doShutdown() throws IOException {
         IOUtils.close(databaseReader.get());
         int numEntriesEvicted = cache.purgeCacheEntriesForDatabase(databasePath);
         logger.info("evicted [{}] entries from cache after reloading database [{}]", numEntriesEvicted, databasePath);
-        if (deleteDatabaseFileOnClose) {
+        if (deleteDatabaseFileOnShutdown) {
             logger.info("deleting [{}]", databasePath);
             Files.delete(databasePath);
         }

+ 2 - 2
modules/ingest-geoip/src/main/java/org/elasticsearch/ingest/geoip/GeoIpCache.java

@@ -60,7 +60,7 @@ final class GeoIpCache {
     }
 
     @SuppressWarnings("unchecked")
-    <T> T putIfAbsent(String ip, String databasePath, Function<String, T> retrieveFunction) {
+    <RESPONSE> RESPONSE putIfAbsent(String ip, String databasePath, Function<String, RESPONSE> retrieveFunction) {
         // can't use cache.computeIfAbsent due to the elevated permissions for the jackson (run via the cache loader)
         CacheKey cacheKey = new CacheKey(ip, databasePath);
         long cacheStart = relativeNanoTimeProvider.getAsLong();
@@ -87,7 +87,7 @@ final class GeoIpCache {
         if (response == NO_RESULT) {
             return null; // the no-result sentinel is an internal detail, don't expose it
         } else {
-            return (T) response;
+            return (RESPONSE) response;
         }
     }
 

+ 15 - 21
modules/ingest-geoip/src/main/java/org/elasticsearch/ingest/geoip/GeoIpProcessor.java

@@ -118,15 +118,14 @@ public final class GeoIpProcessor extends AbstractProcessor {
             throw new IllegalArgumentException("field [" + field + "] is null, cannot extract geoip information.");
         }
 
-        IpDatabase ipDatabase = this.supplier.get();
-        if (ipDatabase == null) {
-            if (ignoreMissing == false) {
-                tag(ingestDocument, databaseFile);
+        try (IpDatabase ipDatabase = this.supplier.get()) {
+            if (ipDatabase == null) {
+                if (ignoreMissing == false) {
+                    tag(ingestDocument, databaseFile);
+                }
+                return ingestDocument;
             }
-            return ingestDocument;
-        }
 
-        try {
             if (ip instanceof String ipString) {
                 Map<String, Object> geoData = getGeoData(ipDatabase, ipString);
                 if (geoData.isEmpty() == false) {
@@ -157,9 +156,8 @@ public final class GeoIpProcessor extends AbstractProcessor {
             } else {
                 throw new IllegalArgumentException("field [" + field + "] should contain only string or array of strings");
             }
-        } finally {
-            ipDatabase.release();
         }
+
         return ingestDocument;
     }
 
@@ -742,20 +740,16 @@ public final class GeoIpProcessor extends AbstractProcessor {
                 deprecationLogger.warn(DeprecationCategory.OTHER, "default_databases_message", DEFAULT_DATABASES_DEPRECATION_MESSAGE);
             }
 
-            IpDatabase ipDatabase = ipDatabaseProvider.getDatabase(databaseFile);
-            if (ipDatabase == null) {
-                // It's possible that the database could be downloaded via the GeoipDownloader process and could become available
-                // at a later moment, so a processor impl is returned that tags documents instead. If a database cannot be sourced then the
-                // processor will continue to tag documents with a warning until it is remediated by providing a database or changing the
-                // pipeline.
-                return new DatabaseUnavailableProcessor(processorTag, description, databaseFile);
-            }
-
             final String databaseType;
-            try {
+            try (IpDatabase ipDatabase = ipDatabaseProvider.getDatabase(databaseFile)) {
+                if (ipDatabase == null) {
+                    // It's possible that the database could be downloaded via the GeoipDownloader process and could become available
+                    // at a later moment, so a processor impl is returned that tags documents instead. If a database cannot be sourced
+                    // then the processor will continue to tag documents with a warning until it is remediated by providing a database
+                    // or changing the pipeline.
+                    return new DatabaseUnavailableProcessor(processorTag, description, databaseFile);
+                }
                 databaseType = ipDatabase.getDatabaseType();
-            } finally {
-                ipDatabase.release();
             }
 
             final Database database;

+ 1 - 1
modules/ingest-geoip/src/main/java/org/elasticsearch/ingest/geoip/IngestGeoIpPlugin.java

@@ -161,7 +161,7 @@ public class IngestGeoIpPlugin extends Plugin
 
     @Override
     public void close() throws IOException {
-        databaseRegistry.get().close();
+        databaseRegistry.get().shutdown();
     }
 
     @Override

+ 4 - 2
modules/ingest-geoip/src/main/java/org/elasticsearch/ingest/geoip/IpDatabase.java

@@ -25,7 +25,7 @@ import java.io.IOException;
 /**
  * Provides a uniform interface for interacting with various ip databases.
  */
-public interface IpDatabase {
+public interface IpDatabase extends AutoCloseable {
 
     /**
      * @return the database type as it is detailed in the database file metadata
@@ -76,7 +76,9 @@ public interface IpDatabase {
     /**
      * Releases the current database object. Called after processing a single document. Databases should be closed or returned to a
      * resource pool. No further interactions should be expected.
+     *
      * @throws IOException if the implementation encounters any problem while cleaning up
      */
-    void release() throws IOException;
+    @Override
+    void close() throws IOException;
 }

+ 1 - 1
modules/ingest-geoip/src/test/java/org/elasticsearch/ingest/geoip/GeoIpProcessorFactoryTests.java

@@ -83,7 +83,7 @@ public class GeoIpProcessorFactoryTests extends ESTestCase {
 
     @After
     public void closeDatabaseReaders() throws IOException {
-        databaseNodeService.close();
+        databaseNodeService.shutdown();
         databaseNodeService = null;
     }
 

+ 4 - 4
modules/ingest-geoip/src/test/java/org/elasticsearch/ingest/geoip/GeoIpProcessorTests.java

@@ -43,7 +43,7 @@ public class GeoIpProcessorTests extends ESTestCase {
     private static final Set<Property> ALL_PROPERTIES = Set.of(Property.values());
 
     // a temporary directory that mmdb files can be copied to and read from
-    Path tmpDir;
+    private Path tmpDir;
 
     @Before
     public void setup() {
@@ -666,7 +666,7 @@ public class GeoIpProcessorTests extends ESTestCase {
 
         // Check the loader's reference count and attempt to close
         assertThat(loader.current(), equalTo(0));
-        loader.close();
+        loader.shutdown();
         assertTrue(closeCheck.get());
     }
 
@@ -797,11 +797,11 @@ public class GeoIpProcessorTests extends ESTestCase {
         final GeoIpCache cache = new GeoIpCache(1000);
         return new DatabaseReaderLazyLoader(cache, path, null) {
             @Override
-            protected void doClose() throws IOException {
+            protected void doShutdown() throws IOException {
                 if (closed != null) {
                     closed.set(true);
                 }
-                super.doClose();
+                super.doShutdown();
             }
         };
     }

+ 1 - 1
modules/ingest-geoip/src/test/java/org/elasticsearch/ingest/geoip/MMDBUtilTests.java

@@ -26,7 +26,7 @@ import static org.hamcrest.Matchers.is;
 public class MMDBUtilTests extends ESTestCase {
 
     // a temporary directory that mmdb files can be copied to and read from
-    Path tmpDir;
+    private Path tmpDir;
 
     @Before
     public void setup() {