|
@@ -1,8 +1,8 @@
|
|
|
/*
|
|
|
- * Licensed to Elastic Search and Shay Banon under one
|
|
|
+ * Licensed to ElasticSearch and Shay Banon under one
|
|
|
* or more contributor license agreements. See the NOTICE file
|
|
|
* distributed with this work for additional information
|
|
|
- * regarding copyright ownership. Elastic Search licenses this
|
|
|
+ * regarding copyright ownership. ElasticSearch licenses this
|
|
|
* file to you under the Apache License, Version 2.0 (the
|
|
|
* "License"); you may not use this file except in compliance
|
|
|
* with the License. You may obtain a copy of the License at
|
|
@@ -19,6 +19,7 @@
|
|
|
|
|
|
package org.elasticsearch.action.admin.cluster.node.shutdown;
|
|
|
|
|
|
+import com.google.common.collect.Sets;
|
|
|
import org.elasticsearch.ElasticSearchException;
|
|
|
import org.elasticsearch.ElasticSearchIllegalStateException;
|
|
|
import org.elasticsearch.action.TransportActions;
|
|
@@ -27,7 +28,6 @@ import org.elasticsearch.cluster.ClusterName;
|
|
|
import org.elasticsearch.cluster.ClusterService;
|
|
|
import org.elasticsearch.cluster.ClusterState;
|
|
|
import org.elasticsearch.cluster.node.DiscoveryNode;
|
|
|
-import org.elasticsearch.common.collect.Sets;
|
|
|
import org.elasticsearch.common.inject.Inject;
|
|
|
import org.elasticsearch.common.io.stream.StreamInput;
|
|
|
import org.elasticsearch.common.io.stream.StreamOutput;
|
|
@@ -37,18 +37,14 @@ import org.elasticsearch.common.settings.Settings;
|
|
|
import org.elasticsearch.common.unit.TimeValue;
|
|
|
import org.elasticsearch.node.Node;
|
|
|
import org.elasticsearch.threadpool.ThreadPool;
|
|
|
-import org.elasticsearch.transport.BaseTransportRequestHandler;
|
|
|
-import org.elasticsearch.transport.TransportChannel;
|
|
|
-import org.elasticsearch.transport.TransportException;
|
|
|
-import org.elasticsearch.transport.TransportService;
|
|
|
-import org.elasticsearch.transport.VoidTransportResponseHandler;
|
|
|
+import org.elasticsearch.transport.*;
|
|
|
|
|
|
import java.io.IOException;
|
|
|
import java.util.Set;
|
|
|
import java.util.concurrent.CountDownLatch;
|
|
|
|
|
|
/**
|
|
|
- * @author kimchy (shay.banon)
|
|
|
+ *
|
|
|
*/
|
|
|
public class TransportNodesShutdownAction extends TransportMasterNodeOperationAction<NodesShutdownRequest, NodesShutdownResponse> {
|
|
|
|
|
@@ -60,8 +56,9 @@ public class TransportNodesShutdownAction extends TransportMasterNodeOperationAc
|
|
|
|
|
|
private final TimeValue delay;
|
|
|
|
|
|
- @Inject public TransportNodesShutdownAction(Settings settings, TransportService transportService, ClusterService clusterService, ThreadPool threadPool,
|
|
|
- Node node, ClusterName clusterName) {
|
|
|
+ @Inject
|
|
|
+ public TransportNodesShutdownAction(Settings settings, TransportService transportService, ClusterService clusterService, ThreadPool threadPool,
|
|
|
+ Node node, ClusterName clusterName) {
|
|
|
super(settings, transportService, clusterService, threadPool);
|
|
|
this.node = node;
|
|
|
this.clusterName = clusterName;
|
|
@@ -71,23 +68,28 @@ public class TransportNodesShutdownAction extends TransportMasterNodeOperationAc
|
|
|
this.transportService.registerHandler(NodeShutdownRequestHandler.ACTION, new NodeShutdownRequestHandler());
|
|
|
}
|
|
|
|
|
|
- @Override protected String executor() {
|
|
|
+ @Override
|
|
|
+ protected String executor() {
|
|
|
return ThreadPool.Names.CACHED;
|
|
|
}
|
|
|
|
|
|
- @Override protected String transportAction() {
|
|
|
+ @Override
|
|
|
+ protected String transportAction() {
|
|
|
return TransportActions.Admin.Cluster.Node.SHUTDOWN;
|
|
|
}
|
|
|
|
|
|
- @Override protected NodesShutdownRequest newRequest() {
|
|
|
+ @Override
|
|
|
+ protected NodesShutdownRequest newRequest() {
|
|
|
return new NodesShutdownRequest();
|
|
|
}
|
|
|
|
|
|
- @Override protected NodesShutdownResponse newResponse() {
|
|
|
+ @Override
|
|
|
+ protected NodesShutdownResponse newResponse() {
|
|
|
return new NodesShutdownResponse();
|
|
|
}
|
|
|
|
|
|
- @Override protected void processBeforeDelegationToMaster(NodesShutdownRequest request, ClusterState state) {
|
|
|
+ @Override
|
|
|
+ protected void processBeforeDelegationToMaster(NodesShutdownRequest request, ClusterState state) {
|
|
|
String[] nodesIds = request.nodesIds;
|
|
|
if (nodesIds != null) {
|
|
|
for (int i = 0; i < nodesIds.length; i++) {
|
|
@@ -99,7 +101,8 @@ public class TransportNodesShutdownAction extends TransportMasterNodeOperationAc
|
|
|
}
|
|
|
}
|
|
|
|
|
|
- @Override protected NodesShutdownResponse masterOperation(final NodesShutdownRequest request, final ClusterState state) throws ElasticSearchException {
|
|
|
+ @Override
|
|
|
+ protected NodesShutdownResponse masterOperation(final NodesShutdownRequest request, final ClusterState state) throws ElasticSearchException {
|
|
|
if (disabled) {
|
|
|
throw new ElasticSearchIllegalStateException("Shutdown is disabled");
|
|
|
}
|
|
@@ -109,7 +112,8 @@ public class TransportNodesShutdownAction extends TransportMasterNodeOperationAc
|
|
|
nodes.addAll(state.nodes().dataNodes().values());
|
|
|
nodes.addAll(state.nodes().masterNodes().values());
|
|
|
Thread t = new Thread(new Runnable() {
|
|
|
- @Override public void run() {
|
|
|
+ @Override
|
|
|
+ public void run() {
|
|
|
try {
|
|
|
Thread.sleep(request.delay.millis());
|
|
|
} catch (InterruptedException e) {
|
|
@@ -127,12 +131,14 @@ public class TransportNodesShutdownAction extends TransportMasterNodeOperationAc
|
|
|
} else {
|
|
|
logger.trace("[cluster_shutdown]: sending shutdown request to [{}]", node);
|
|
|
transportService.sendRequest(node, NodeShutdownRequestHandler.ACTION, new NodeShutdownRequest(request.exit), new VoidTransportResponseHandler(ThreadPool.Names.SAME) {
|
|
|
- @Override public void handleResponse(VoidStreamable response) {
|
|
|
+ @Override
|
|
|
+ public void handleResponse(VoidStreamable response) {
|
|
|
logger.trace("[cluster_shutdown]: received shutdown response from [{}]", node);
|
|
|
latch.countDown();
|
|
|
}
|
|
|
|
|
|
- @Override public void handleException(TransportException exp) {
|
|
|
+ @Override
|
|
|
+ public void handleException(TransportException exp) {
|
|
|
logger.warn("[cluster_shutdown]: received failed shutdown response from [{}]", exp, node);
|
|
|
latch.countDown();
|
|
|
}
|
|
@@ -149,11 +155,13 @@ public class TransportNodesShutdownAction extends TransportMasterNodeOperationAc
|
|
|
// now, kill the master
|
|
|
logger.trace("[cluster_shutdown]: shutting down the master [{}]", state.nodes().masterNode());
|
|
|
transportService.sendRequest(state.nodes().masterNode(), NodeShutdownRequestHandler.ACTION, new NodeShutdownRequest(request.exit), new VoidTransportResponseHandler(ThreadPool.Names.SAME) {
|
|
|
- @Override public void handleResponse(VoidStreamable response) {
|
|
|
+ @Override
|
|
|
+ public void handleResponse(VoidStreamable response) {
|
|
|
logger.trace("[cluster_shutdown]: received shutdown response from master");
|
|
|
}
|
|
|
|
|
|
- @Override public void handleException(TransportException exp) {
|
|
|
+ @Override
|
|
|
+ public void handleException(TransportException exp) {
|
|
|
logger.warn("[cluster_shutdown]: received failed shutdown response master", exp);
|
|
|
}
|
|
|
});
|
|
@@ -172,7 +180,8 @@ public class TransportNodesShutdownAction extends TransportMasterNodeOperationAc
|
|
|
}
|
|
|
|
|
|
Thread t = new Thread(new Runnable() {
|
|
|
- @Override public void run() {
|
|
|
+ @Override
|
|
|
+ public void run() {
|
|
|
try {
|
|
|
Thread.sleep(request.delay.millis());
|
|
|
} catch (InterruptedException e) {
|
|
@@ -190,12 +199,14 @@ public class TransportNodesShutdownAction extends TransportMasterNodeOperationAc
|
|
|
|
|
|
logger.trace("[partial_cluster_shutdown]: sending shutdown request to [{}]", node);
|
|
|
transportService.sendRequest(node, NodeShutdownRequestHandler.ACTION, new NodeShutdownRequest(request.exit), new VoidTransportResponseHandler(ThreadPool.Names.SAME) {
|
|
|
- @Override public void handleResponse(VoidStreamable response) {
|
|
|
+ @Override
|
|
|
+ public void handleResponse(VoidStreamable response) {
|
|
|
logger.trace("[partial_cluster_shutdown]: received shutdown response from [{}]", node);
|
|
|
latch.countDown();
|
|
|
}
|
|
|
|
|
|
- @Override public void handleException(TransportException exp) {
|
|
|
+ @Override
|
|
|
+ public void handleException(TransportException exp) {
|
|
|
logger.warn("[partial_cluster_shutdown]: received failed shutdown response from [{}]", exp, node);
|
|
|
latch.countDown();
|
|
|
}
|
|
@@ -220,21 +231,25 @@ public class TransportNodesShutdownAction extends TransportMasterNodeOperationAc
|
|
|
|
|
|
static final String ACTION = "/cluster/nodes/shutdown/node";
|
|
|
|
|
|
- @Override public NodeShutdownRequest newInstance() {
|
|
|
+ @Override
|
|
|
+ public NodeShutdownRequest newInstance() {
|
|
|
return new NodeShutdownRequest();
|
|
|
}
|
|
|
|
|
|
- @Override public String executor() {
|
|
|
+ @Override
|
|
|
+ public String executor() {
|
|
|
return ThreadPool.Names.SAME;
|
|
|
}
|
|
|
|
|
|
- @Override public void messageReceived(final NodeShutdownRequest request, TransportChannel channel) throws Exception {
|
|
|
+ @Override
|
|
|
+ public void messageReceived(final NodeShutdownRequest request, TransportChannel channel) throws Exception {
|
|
|
if (disabled) {
|
|
|
throw new ElasticSearchIllegalStateException("Shutdown is disabled");
|
|
|
}
|
|
|
logger.info("shutting down in [{}]", delay);
|
|
|
Thread t = new Thread(new Runnable() {
|
|
|
- @Override public void run() {
|
|
|
+ @Override
|
|
|
+ public void run() {
|
|
|
try {
|
|
|
Thread.sleep(delay.millis());
|
|
|
} catch (InterruptedException e) {
|
|
@@ -290,11 +305,13 @@ public class TransportNodesShutdownAction extends TransportMasterNodeOperationAc
|
|
|
this.exit = exit;
|
|
|
}
|
|
|
|
|
|
- @Override public void readFrom(StreamInput in) throws IOException {
|
|
|
+ @Override
|
|
|
+ public void readFrom(StreamInput in) throws IOException {
|
|
|
exit = in.readBoolean();
|
|
|
}
|
|
|
|
|
|
- @Override public void writeTo(StreamOutput out) throws IOException {
|
|
|
+ @Override
|
|
|
+ public void writeTo(StreamOutput out) throws IOException {
|
|
|
out.writeBoolean(exit);
|
|
|
}
|
|
|
}
|