浏览代码

ThreadPool: make sure no leaking threads are left behind in case of initialization failure

Our ThreadPool constructor creates a couple of threads (scheduler and timer) which might not get shut down if the initialization of a node fails. A guice error might occur for example, which causes the InternalNode constructor to throw an exception. In this case the two threads are left behind, which is not a big problem when running es standalone as the error will be intercepted and the jvm will be stopped as a whole. It can become more of a problem though when running es in embedded mode, as we'll end up with lingering threads or testing an handling of initialization failures.

Closes #9107
Igor Motov 10 年之前
父节点
当前提交
573cacab54

+ 29 - 18
src/main/java/org/elasticsearch/client/transport/TransportClient.java

@@ -43,6 +43,7 @@ import org.elasticsearch.env.EnvironmentModule;
 import org.elasticsearch.indices.breaker.CircuitBreakerModule;
 import org.elasticsearch.monitor.MonitorService;
 import org.elasticsearch.node.internal.InternalSettingsPreparer;
+import org.elasticsearch.node.settings.NodeSettingsService;
 import org.elasticsearch.plugins.PluginsModule;
 import org.elasticsearch.plugins.PluginsService;
 import org.elasticsearch.search.TransportSearchModule;
@@ -125,24 +126,34 @@ public class TransportClient extends AbstractClient {
 
             CompressorFactory.configure(this.settings);
 
-            ModulesBuilder modules = new ModulesBuilder();
-            modules.add(new Version.Module(version));
-            modules.add(new PluginsModule(this.settings, pluginsService));
-            modules.add(new EnvironmentModule(environment));
-            modules.add(new SettingsModule(this.settings));
-            modules.add(new NetworkModule());
-            modules.add(new ClusterNameModule(this.settings));
-            modules.add(new ThreadPoolModule(this.settings));
-            modules.add(new TransportSearchModule());
-            modules.add(new TransportModule(this.settings));
-            modules.add(new ActionModule(true));
-            modules.add(new ClientTransportModule());
-            modules.add(new CircuitBreakerModule(this.settings));
-
-            Injector injector = modules.createInjector();
-            injector.getInstance(TransportService.class).start();
-
-            return new TransportClient(injector);
+            final ThreadPool threadPool = new ThreadPool(settings);
+
+            boolean success = false;
+            try {
+                ModulesBuilder modules = new ModulesBuilder();
+                modules.add(new Version.Module(version));
+                modules.add(new PluginsModule(this.settings, pluginsService));
+                modules.add(new EnvironmentModule(environment));
+                modules.add(new SettingsModule(this.settings));
+                modules.add(new NetworkModule());
+                modules.add(new ClusterNameModule(this.settings));
+                modules.add(new ThreadPoolModule(threadPool));
+                modules.add(new TransportSearchModule());
+                modules.add(new TransportModule(this.settings));
+                modules.add(new ActionModule(true));
+                modules.add(new ClientTransportModule());
+                modules.add(new CircuitBreakerModule(this.settings));
+
+                Injector injector = modules.createInjector();
+                injector.getInstance(TransportService.class).start();
+                TransportClient transportClient = new TransportClient(injector);
+                success = true;
+                return transportClient;
+            } finally {
+                if (!success) {
+                    ThreadPool.terminate(threadPool, 10, TimeUnit.SECONDS);
+                }
+            }
         }
     }
 

+ 6 - 1
src/main/java/org/elasticsearch/node/Node.java

@@ -75,6 +75,7 @@ import org.elasticsearch.monitor.MonitorService;
 import org.elasticsearch.monitor.jvm.JvmInfo;
 import org.elasticsearch.node.internal.InternalSettingsPreparer;
 import org.elasticsearch.node.internal.NodeModule;
+import org.elasticsearch.node.settings.NodeSettingsService;
 import org.elasticsearch.percolator.PercolatorModule;
 import org.elasticsearch.percolator.PercolatorService;
 import org.elasticsearch.plugins.PluginsModule;
@@ -159,6 +160,8 @@ public class Node implements Releasable {
             throw new IllegalStateException("Failed to created node environment", ex);
         }
 
+        final ThreadPool threadPool = new ThreadPool(settings);
+
         boolean success = false;
         try {
             ModulesBuilder modules = new ModulesBuilder();
@@ -174,7 +177,7 @@ public class Node implements Releasable {
             modules.add(new EnvironmentModule(environment));
             modules.add(new NodeEnvironmentModule(nodeEnvironment));
             modules.add(new ClusterNameModule(settings));
-            modules.add(new ThreadPoolModule(settings));
+            modules.add(new ThreadPoolModule(threadPool));
             modules.add(new DiscoveryModule(settings));
             modules.add(new ClusterModule(settings));
             modules.add(new RestModule(settings));
@@ -198,10 +201,12 @@ public class Node implements Releasable {
             injector = modules.createInjector();
 
             client = injector.getInstance(Client.class);
+            threadPool.setNodeSettingsService(injector.getInstance(NodeSettingsService.class));
             success = true;
         } finally {
             if (!success) {
                 nodeEnvironment.close();
+                ThreadPool.terminate(threadPool, 10, TimeUnit.SECONDS);
             }
         }
 

+ 12 - 6
src/main/java/org/elasticsearch/threadpool/ThreadPool.java

@@ -91,13 +91,14 @@ public class ThreadPool extends AbstractComponent {
 
     private final EstimatedTimeThread estimatedTimeThread;
 
+    private boolean settingsListenerIsSet = false;
+
 
     public ThreadPool(String name) {
-        this(ImmutableSettings.builder().put("name", name).build(), null);
+        this(ImmutableSettings.builder().put("name", name).build());
     }
 
-    @Inject
-    public ThreadPool(Settings settings, @Nullable NodeSettingsService nodeSettingsService) {
+    public ThreadPool(Settings settings) {
         super(settings);
 
         assert settings.get("name") != null : "ThreadPool's settings should contain a name";
@@ -148,15 +149,20 @@ public class ThreadPool extends AbstractComponent {
         this.scheduler.setExecuteExistingDelayedTasksAfterShutdownPolicy(false);
         this.scheduler.setContinueExistingPeriodicTasksAfterShutdownPolicy(false);
         this.scheduler.setRemoveOnCancelPolicy(true);
-        if (nodeSettingsService != null) {
-            nodeSettingsService.addListener(new ApplySettings());
-        }
 
         TimeValue estimatedTimeInterval = settings.getAsTime("threadpool.estimated_time_interval", TimeValue.timeValueMillis(200));
         this.estimatedTimeThread = new EstimatedTimeThread(EsExecutors.threadName(settings, "[timer]"), estimatedTimeInterval.millis());
         this.estimatedTimeThread.start();
     }
 
+    public void setNodeSettingsService(NodeSettingsService nodeSettingsService) {
+        if(settingsListenerIsSet) {
+            throw new IllegalStateException("the node settings listener was set more then once");
+        }
+        nodeSettingsService.addListener(new ApplySettings());
+        settingsListenerIsSet = true;
+    }
+
     public long estimatedTimeInMillis() {
         return estimatedTimeThread.estimatedTimeInMillis();
     }

+ 4 - 4
src/main/java/org/elasticsearch/threadpool/ThreadPoolModule.java

@@ -27,14 +27,14 @@ import org.elasticsearch.common.settings.Settings;
  */
 public class ThreadPoolModule extends AbstractModule {
 
-    private final Settings settings;
+    private final ThreadPool threadPool;
 
-    public ThreadPoolModule(Settings settings) {
-        this.settings = settings;
+    public ThreadPoolModule(ThreadPool threadPool) {
+        this.threadPool = threadPool;
     }
 
     @Override
     protected void configure() {
-        bind(ThreadPool.class).asEagerSingleton();
+        bind(ThreadPool.class).toInstance(threadPool);
     }
 }

+ 1 - 1
src/test/java/org/elasticsearch/index/query/TemplateQueryParserTest.java

@@ -77,7 +77,7 @@ public class TemplateQueryParserTest extends ElasticsearchTestCase {
         injector = new ModulesBuilder().add(
                 new EnvironmentModule(new Environment(settings)),
                 new SettingsModule(settings),
-                new ThreadPoolModule(settings),
+                new ThreadPoolModule(new ThreadPool(settings)),
                 new IndicesQueriesModule(),
                 new ScriptModule(settings),
                 new IndexSettingsModule(index, settings),

+ 1 - 1
src/test/java/org/elasticsearch/index/query/plugin/IndexQueryParserPlugin2Tests.java

@@ -70,7 +70,7 @@ public class IndexQueryParserPlugin2Tests extends ElasticsearchTestCase {
         Injector injector = new ModulesBuilder().add(
                 new EnvironmentModule(new Environment(settings)),
                 new SettingsModule(settings),
-                new ThreadPoolModule(settings),
+                new ThreadPoolModule(new ThreadPool(settings)),
                 new IndicesQueriesModule(),
                 new ScriptModule(settings),
                 new IndexSettingsModule(index, settings),

+ 1 - 1
src/test/java/org/elasticsearch/index/query/plugin/IndexQueryParserPluginTests.java

@@ -75,7 +75,7 @@ public class IndexQueryParserPluginTests extends ElasticsearchTestCase {
         Injector injector = new ModulesBuilder().add(
                 new EnvironmentModule(new Environment(settings)),
                 new SettingsModule(settings),
-                new ThreadPoolModule(settings),
+                new ThreadPoolModule(new ThreadPool(settings)),
                 new IndicesQueriesModule(),
                 new ScriptModule(settings),
                 new IndexSettingsModule(index, settings),

+ 1 - 1
src/test/java/org/elasticsearch/script/NativeScriptTests.java

@@ -55,7 +55,7 @@ public class NativeScriptTests extends ElasticsearchTestCase {
                 .build();
         Injector injector = new ModulesBuilder().add(
                 new EnvironmentModule(new Environment(settings)),
-                new ThreadPoolModule(settings),
+                new ThreadPoolModule(new ThreadPool(settings)),
                 new SettingsModule(settings),
                 new ScriptModule(settings)).createInjector();
 

+ 19 - 0
src/test/java/org/elasticsearch/threadpool/SimpleThreadPoolTests.java

@@ -32,6 +32,7 @@ import org.elasticsearch.common.xcontent.XContentFactory;
 import org.elasticsearch.common.xcontent.XContentParser;
 import org.elasticsearch.common.xcontent.json.JsonXContent;
 import org.elasticsearch.index.query.QueryBuilders;
+import org.elasticsearch.node.NodeBuilder;
 import org.elasticsearch.test.ElasticsearchIntegrationTest;
 import org.elasticsearch.test.ElasticsearchIntegrationTest.ClusterScope;
 import org.elasticsearch.test.ElasticsearchSingleNodeTest;
@@ -191,6 +192,24 @@ public class SimpleThreadPoolTests extends ElasticsearchIntegrationTest {
         }
     }
 
+    @Test
+    public void testThreadPoolLeakingThreadsWithTribeNode() {
+        Settings settings = ImmutableSettings.builder()
+                .put("node.name", "thread_pool_leaking_threads_tribe_node")
+                .put("path.home", createTempDir())
+                .put("tribe.t1.cluster.name", "non_existing_cluster")
+                        //trigger initialization failure of one of the tribes (doesn't require starting the node)
+                .put("tribe.t1.plugin.mandatory", "non_existing").build();
+
+        try {
+            NodeBuilder.nodeBuilder().settings(settings).build();
+            fail("The node startup is supposed to fail");
+        } catch(Throwable t) {
+            //all good
+            assertThat(t.getMessage(), containsString("mandatory plugins [non_existing]"));
+        }
+    }
+
     private Map<String, Object> getPoolSettingsThroughJson(ThreadPoolInfo info, String poolName) throws IOException {
         XContentBuilder builder = XContentFactory.jsonBuilder();
         builder.startObject();

+ 1 - 1
src/test/java/org/elasticsearch/threadpool/ThreadPoolSerializationTests.java

@@ -95,7 +95,7 @@ public class ThreadPoolSerializationTests extends ElasticsearchTestCase {
     @Test
     public void testThatNegativeSettingAllowsToStart() throws InterruptedException {
         Settings settings = settingsBuilder().put("name", "index").put("threadpool.index.queue_size", "-1").build();
-        ThreadPool threadPool = new ThreadPool(settings, null);
+        ThreadPool threadPool = new ThreadPool(settings);
         assertThat(threadPool.info("index").getQueueSize(), is(nullValue()));
         terminate(threadPool);
     }

+ 5 - 5
src/test/java/org/elasticsearch/threadpool/UpdateThreadPoolSettingsTests.java

@@ -54,7 +54,7 @@ public class UpdateThreadPoolSettingsTests extends ElasticsearchTestCase {
         ThreadPool threadPool = new ThreadPool(
                 ImmutableSettings.settingsBuilder()
                         .put("threadpool.search.type", "cached")
-                        .put("name","testCachedExecutorType").build(), null);
+                        .put("name","testCachedExecutorType").build());
 
         assertThat(info(threadPool, Names.SEARCH).getType(), equalTo("cached"));
         assertThat(info(threadPool, Names.SEARCH).getKeepAlive().minutes(), equalTo(5L));
@@ -109,7 +109,7 @@ public class UpdateThreadPoolSettingsTests extends ElasticsearchTestCase {
     public void testFixedExecutorType() throws InterruptedException {
         ThreadPool threadPool = new ThreadPool(settingsBuilder()
                 .put("threadpool.search.type", "fixed")
-                .put("name","testCachedExecutorType").build(), null);
+                .put("name","testCachedExecutorType").build());
 
         assertThat(threadPool.executor(Names.SEARCH), instanceOf(EsThreadPoolExecutor.class));
 
@@ -170,7 +170,7 @@ public class UpdateThreadPoolSettingsTests extends ElasticsearchTestCase {
         ThreadPool threadPool = new ThreadPool(settingsBuilder()
                 .put("threadpool.search.type", "scaling")
                 .put("threadpool.search.size", 10)
-                .put("name","testCachedExecutorType").build(), null);
+                .put("name","testCachedExecutorType").build());
 
         assertThat(info(threadPool, Names.SEARCH).getMin(), equalTo(1));
         assertThat(info(threadPool, Names.SEARCH).getMax(), equalTo(10));
@@ -204,7 +204,7 @@ public class UpdateThreadPoolSettingsTests extends ElasticsearchTestCase {
     public void testShutdownDownNowDoesntBlock() throws Exception {
         ThreadPool threadPool = new ThreadPool(ImmutableSettings.settingsBuilder()
                 .put("threadpool.search.type", "cached")
-                .put("name","testCachedExecutorType").build(), null);
+                .put("name","testCachedExecutorType").build());
 
         final CountDownLatch latch = new CountDownLatch(1);
         Executor oldExecutor = threadPool.executor(Names.SEARCH);
@@ -236,7 +236,7 @@ public class UpdateThreadPoolSettingsTests extends ElasticsearchTestCase {
                 .put("threadpool.my_pool2.type", "fixed")
                 .put("threadpool.my_pool2.size", "1")
                 .put("threadpool.my_pool2.queue_size", "1")
-                .put("name", "testCustomThreadPool").build(), null);
+                .put("name", "testCustomThreadPool").build());
 
         ThreadPoolInfo groups = threadPool.info();
         boolean foundPool1 = false;

+ 2 - 2
src/test/java/org/elasticsearch/transport/NettySizeHeaderFrameDecoderTests.java

@@ -58,8 +58,8 @@ public class NettySizeHeaderFrameDecoderTests extends ElasticsearchTestCase {
 
     @Before
     public void startThreadPool() {
-        threadPool = new ThreadPool(settings, new NodeSettingsService(settings));
-
+        threadPool = new ThreadPool(settings);
+        threadPool.setNodeSettingsService(new NodeSettingsService(settings));
         NetworkService networkService = new NetworkService(settings);
         BigArrays bigArrays = new MockBigArrays(new MockPageCacheRecycler(settings, threadPool), new NoneCircuitBreakerService());
         nettyTransport = new NettyTransport(settings, threadPool, networkService, bigArrays, Version.CURRENT);