|
@@ -19,6 +19,7 @@
|
|
|
|
|
|
package org.elasticsearch.client.transport;
|
|
|
|
|
|
+import org.apache.lucene.util.IOUtils;
|
|
|
import org.elasticsearch.action.Action;
|
|
|
import org.elasticsearch.action.ActionListener;
|
|
|
import org.elasticsearch.action.ActionModule;
|
|
@@ -41,7 +42,6 @@ import org.elasticsearch.common.settings.SettingsModule;
|
|
|
import org.elasticsearch.common.transport.TransportAddress;
|
|
|
import org.elasticsearch.common.util.BigArrays;
|
|
|
import org.elasticsearch.indices.breaker.CircuitBreakerService;
|
|
|
-import org.elasticsearch.monitor.MonitorService;
|
|
|
import org.elasticsearch.node.Node;
|
|
|
import org.elasticsearch.node.internal.InternalSettingsPreparer;
|
|
|
import org.elasticsearch.plugins.Plugin;
|
|
@@ -52,6 +52,7 @@ import org.elasticsearch.threadpool.ThreadPool;
|
|
|
import org.elasticsearch.transport.TransportService;
|
|
|
import org.elasticsearch.transport.netty.NettyTransport;
|
|
|
|
|
|
+import java.io.Closeable;
|
|
|
import java.util.ArrayList;
|
|
|
import java.util.List;
|
|
|
import java.util.concurrent.TimeUnit;
|
|
@@ -118,10 +119,11 @@ public class TransportClient extends AbstractClient {
|
|
|
public TransportClient build() {
|
|
|
final PluginsService pluginsService = newPluginService(providedSettings);
|
|
|
final Settings settings = pluginsService.updatedSettings();
|
|
|
+ final List<Closeable> resourcesToClose = new ArrayList<>();
|
|
|
final ThreadPool threadPool = new ThreadPool(settings);
|
|
|
+ resourcesToClose.add(() -> ThreadPool.terminate(threadPool, 10, TimeUnit.SECONDS));
|
|
|
final NetworkService networkService = new NetworkService(settings);
|
|
|
NamedWriteableRegistry namedWriteableRegistry = new NamedWriteableRegistry();
|
|
|
- boolean success = false;
|
|
|
try {
|
|
|
ModulesBuilder modules = new ModulesBuilder();
|
|
|
// plugin modules must be added here, before others or we can get crazy injection errors...
|
|
@@ -149,8 +151,12 @@ public class TransportClient extends AbstractClient {
|
|
|
SettingsModule settingsModule = new SettingsModule(settings, additionalSettings, additionalSettingsFilter);
|
|
|
CircuitBreakerService circuitBreakerService = Node.createCircuitBreakerService(settingsModule.getSettings(),
|
|
|
settingsModule.getClusterSettings());
|
|
|
+ resourcesToClose.add(circuitBreakerService);
|
|
|
+ BigArrays bigArrays = new BigArrays(settings, circuitBreakerService);
|
|
|
+ resourcesToClose.add(bigArrays);
|
|
|
modules.add(settingsModule);
|
|
|
modules.add((b -> {
|
|
|
+ b.bind(BigArrays.class).toInstance(bigArrays);
|
|
|
b.bind(PluginsService.class).toInstance(pluginsService);
|
|
|
b.bind(CircuitBreakerService.class).toInstance(circuitBreakerService);
|
|
|
}));
|
|
@@ -159,14 +165,11 @@ public class TransportClient extends AbstractClient {
|
|
|
final TransportService transportService = injector.getInstance(TransportService.class);
|
|
|
transportService.start();
|
|
|
transportService.acceptIncomingRequests();
|
|
|
-
|
|
|
TransportClient transportClient = new TransportClient(injector);
|
|
|
- success = true;
|
|
|
+ resourcesToClose.clear();
|
|
|
return transportClient;
|
|
|
} finally {
|
|
|
- if (!success) {
|
|
|
- ThreadPool.terminate(threadPool, 10, TimeUnit.SECONDS);
|
|
|
- }
|
|
|
+ IOUtils.closeWhileHandlingException(resourcesToClose);
|
|
|
}
|
|
|
}
|
|
|
}
|
|
@@ -261,24 +264,16 @@ public class TransportClient extends AbstractClient {
|
|
|
*/
|
|
|
@Override
|
|
|
public void close() {
|
|
|
- injector.getInstance(TransportClientNodesService.class).close();
|
|
|
- injector.getInstance(TransportService.class).close();
|
|
|
- try {
|
|
|
- injector.getInstance(MonitorService.class).close();
|
|
|
- } catch (Exception e) {
|
|
|
- // ignore, might not be bounded
|
|
|
- }
|
|
|
+ List<Closeable> closeables = new ArrayList<>();
|
|
|
+ closeables.add(injector.getInstance(TransportClientNodesService.class));
|
|
|
+ closeables.add(injector.getInstance(TransportService.class));
|
|
|
|
|
|
for (Class<? extends LifecycleComponent> plugin : injector.getInstance(PluginsService.class).nodeServices()) {
|
|
|
- injector.getInstance(plugin).close();
|
|
|
+ closeables.add(injector.getInstance(plugin));
|
|
|
}
|
|
|
- try {
|
|
|
- ThreadPool.terminate(injector.getInstance(ThreadPool.class), 10, TimeUnit.SECONDS);
|
|
|
- } catch (Exception e) {
|
|
|
- // ignore
|
|
|
- }
|
|
|
-
|
|
|
- injector.getInstance(BigArrays.class).close();
|
|
|
+ closeables.add(() -> ThreadPool.terminate(injector.getInstance(ThreadPool.class), 10, TimeUnit.SECONDS));
|
|
|
+ closeables.add(injector.getInstance(BigArrays.class));
|
|
|
+ IOUtils.closeWhileHandlingException(closeables);
|
|
|
}
|
|
|
|
|
|
@Override
|