1
0
Эх сурвалжийг харах

Merge pull request #19137 from rjernst/closeable_plugins

Make plugins closeable
Ryan Ernst 9 жил өмнө
parent
commit
4dcb2b8024

+ 1 - 7
core/src/main/java/org/elasticsearch/ingest/IngestService.java

@@ -33,7 +33,7 @@ import java.util.Map;
 /**
  * Holder class for several ingest related services.
  */
-public class IngestService implements Closeable {
+public class IngestService {
 
     private final PipelineStore pipelineStore;
     private final PipelineExecutionService pipelineExecutionService;
@@ -65,10 +65,4 @@ public class IngestService implements Closeable {
         }
         return new IngestInfo(processorInfoList);
     }
-
-    @Override
-    public void close() throws IOException {
-        pipelineStore.close();
-    }
-
 }

+ 1 - 8
core/src/main/java/org/elasticsearch/ingest/PipelineStore.java

@@ -47,7 +47,7 @@ import java.util.HashMap;
 import java.util.List;
 import java.util.Map;
 
-public class PipelineStore extends AbstractComponent implements Closeable, ClusterStateListener {
+public class PipelineStore extends AbstractComponent implements ClusterStateListener {
 
     private final Pipeline.Factory factory = new Pipeline.Factory();
     private ProcessorsRegistry processorRegistry;
@@ -67,13 +67,6 @@ public class PipelineStore extends AbstractComponent implements Closeable, Clust
         this.processorRegistry = processorsRegistryBuilder.build(scriptService, clusterService);
     }
 
-    @Override
-    public void close() throws IOException {
-        // TODO: When org.elasticsearch.node.Node can close Closable instances we should try to remove this code,
-        // since any wired closable should be able to close itself
-        processorRegistry.close();
-    }
-
     @Override
     public void clusterChanged(ClusterChangedEvent event) {
         innerUpdatePipelines(event.state());

+ 4 - 18
core/src/main/java/org/elasticsearch/ingest/ProcessorsRegistry.java

@@ -19,20 +19,17 @@
 
 package org.elasticsearch.ingest;
 
-import org.apache.lucene.util.IOUtils;
-import org.elasticsearch.cluster.service.ClusterService;
-import org.elasticsearch.script.ScriptService;
-
 import java.io.Closeable;
 import java.io.IOException;
-import java.util.ArrayList;
 import java.util.Collections;
 import java.util.HashMap;
-import java.util.List;
 import java.util.Map;
 import java.util.function.Function;
 
-public final class ProcessorsRegistry implements Closeable {
+import org.elasticsearch.cluster.service.ClusterService;
+import org.elasticsearch.script.ScriptService;
+
+public final class ProcessorsRegistry {
 
     private final Map<String, Processor.Factory> processorFactories;
     private final TemplateService templateService;
@@ -67,17 +64,6 @@ public final class ProcessorsRegistry implements Closeable {
         return processorFactories.get(name);
     }
 
-    @Override
-    public void close() throws IOException {
-        List<Closeable> closeables = new ArrayList<>();
-        for (Processor.Factory factory : processorFactories.values()) {
-            if (factory instanceof Closeable) {
-                closeables.add((Closeable) factory);
-            }
-        }
-        IOUtils.close(closeables);
-    }
-
     // For testing:
     Map<String, Processor.Factory> getProcessorFactories() {
         return processorFactories;

+ 1 - 0
core/src/main/java/org/elasticsearch/node/Node.java

@@ -552,6 +552,7 @@ public class Node implements Closeable {
             toClose.add(() -> stopWatch.stop().start("plugin(" + plugin.getName() + ")"));
             toClose.add(injector.getInstance(plugin));
         }
+        toClose.addAll(pluginsService.filterPlugins(Closeable.class));
 
         toClose.add(() -> stopWatch.stop().start("script"));
         toClose.add(injector.getInstance(ScriptService.class));

+ 0 - 1
core/src/main/java/org/elasticsearch/node/service/NodeService.java

@@ -199,7 +199,6 @@ public class NodeService extends AbstractComponent implements Closeable {
 
     @Override
     public void close() throws IOException {
-        ingestService.close();
         indicesService.close();
     }
 }

+ 1 - 6
plugins/ingest-geoip/src/main/java/org/elasticsearch/ingest/geoip/GeoIpProcessor.java

@@ -217,7 +217,7 @@ public final class GeoIpProcessor extends AbstractProcessor {
         return geoData;
     }
 
-    public static final class Factory extends AbstractProcessorFactory<GeoIpProcessor> implements Closeable {
+    public static final class Factory extends AbstractProcessorFactory<GeoIpProcessor> {
         static final Set<Property> DEFAULT_CITY_PROPERTIES = EnumSet.of(
             Property.CONTINENT_NAME, Property.COUNTRY_ISO_CODE, Property.REGION_NAME,
             Property.CITY_NAME, Property.LOCATION
@@ -267,11 +267,6 @@ public final class GeoIpProcessor extends AbstractProcessor {
 
             return new GeoIpProcessor(processorTag, ipField, databaseReader, targetField, properties);
         }
-
-        @Override
-        public void close() throws IOException {
-            IOUtils.close(databaseReaders.values());
-        }
     }
 
     // Geoip2's AddressNotFoundException is checked and due to the fact that we need run their code

+ 16 - 2
plugins/ingest-geoip/src/main/java/org/elasticsearch/ingest/geoip/IngestGeoIpPlugin.java

@@ -20,9 +20,11 @@
 package org.elasticsearch.ingest.geoip;
 
 import com.maxmind.geoip2.DatabaseReader;
+import org.apache.lucene.util.IOUtils;
 import org.elasticsearch.node.NodeModule;
 import org.elasticsearch.plugins.Plugin;
 
+import java.io.Closeable;
 import java.io.IOException;
 import java.io.InputStream;
 import java.nio.file.Files;
@@ -36,11 +38,16 @@ import java.util.Map;
 import java.util.stream.Stream;
 import java.util.zip.GZIPInputStream;
 
-public class IngestGeoIpPlugin extends Plugin {
+public class IngestGeoIpPlugin extends Plugin implements Closeable {
+
+    private Map<String, DatabaseReader> databaseReaders;
 
     public void onModule(NodeModule nodeModule) throws IOException {
+        if (databaseReaders != null) {
+            throw new IllegalStateException("called onModule twice for geoip plugin!!");
+        }
         Path geoIpConfigDirectory = nodeModule.getNode().getEnvironment().configFile().resolve("ingest-geoip");
-        Map<String, DatabaseReader> databaseReaders = loadDatabaseReaders(geoIpConfigDirectory);
+        databaseReaders = loadDatabaseReaders(geoIpConfigDirectory);
         nodeModule.registerProcessor(GeoIpProcessor.TYPE, (registry) -> new GeoIpProcessor.Factory(databaseReaders));
     }
 
@@ -65,4 +72,11 @@ public class IngestGeoIpPlugin extends Plugin {
         }
         return Collections.unmodifiableMap(databaseReaders);
     }
+
+    @Override
+    public void close() throws IOException {
+        if (databaseReaders != null) {
+            IOUtils.close(databaseReaders.values());
+        }
+    }
 }