|
@@ -38,7 +38,7 @@ import org.elasticsearch.cluster.service.ClusterService;
|
|
|
import org.elasticsearch.common.io.stream.Writeable;
|
|
|
import org.elasticsearch.common.settings.Settings;
|
|
|
import org.elasticsearch.common.unit.TimeValue;
|
|
|
-import org.elasticsearch.discovery.Discovery;
|
|
|
+import org.elasticsearch.discovery.Discovery.FailedToCommitClusterStateException;
|
|
|
import org.elasticsearch.discovery.MasterNotDiscoveredException;
|
|
|
import org.elasticsearch.node.NodeClosedException;
|
|
|
import org.elasticsearch.tasks.Task;
|
|
@@ -53,13 +53,15 @@ import java.util.function.Supplier;
|
|
|
/**
|
|
|
* A base class for operations that needs to be performed on the master node.
|
|
|
*/
|
|
|
-public abstract class TransportMasterNodeAction<Request extends MasterNodeRequest<Request>, Response extends ActionResponse> extends HandledTransportAction<Request, Response> {
|
|
|
+public abstract class TransportMasterNodeAction<Request extends MasterNodeRequest<Request>, Response extends ActionResponse>
|
|
|
+ extends HandledTransportAction<Request, Response> {
|
|
|
+
|
|
|
protected final ThreadPool threadPool;
|
|
|
protected final TransportService transportService;
|
|
|
protected final ClusterService clusterService;
|
|
|
protected final IndexNameExpressionResolver indexNameExpressionResolver;
|
|
|
|
|
|
- final String executor;
|
|
|
+ private final String executor;
|
|
|
|
|
|
protected TransportMasterNodeAction(Settings settings, String actionName, TransportService transportService,
|
|
|
ClusterService clusterService, ThreadPool threadPool, ActionFilters actionFilters,
|
|
@@ -75,7 +77,8 @@ public abstract class TransportMasterNodeAction<Request extends MasterNodeReques
|
|
|
|
|
|
protected TransportMasterNodeAction(Settings settings, String actionName, boolean canTripCircuitBreaker,
|
|
|
TransportService transportService, ClusterService clusterService, ThreadPool threadPool,
|
|
|
- ActionFilters actionFilters, IndexNameExpressionResolver indexNameExpressionResolver, Supplier<Request> request) {
|
|
|
+ ActionFilters actionFilters, IndexNameExpressionResolver indexNameExpressionResolver,
|
|
|
+ Supplier<Request> request) {
|
|
|
super(settings, actionName, canTripCircuitBreaker, transportService, actionFilters, request);
|
|
|
this.transportService = transportService;
|
|
|
this.clusterService = clusterService;
|
|
@@ -138,7 +141,8 @@ public abstract class TransportMasterNodeAction<Request extends MasterNodeReques
|
|
|
|
|
|
public void start() {
|
|
|
ClusterState state = clusterService.state();
|
|
|
- this.observer = new ClusterStateObserver(state, clusterService, request.masterNodeTimeout(), logger, threadPool.getThreadContext());
|
|
|
+ this.observer
|
|
|
+ = new ClusterStateObserver(state, clusterService, request.masterNodeTimeout(), logger, threadPool.getThreadContext());
|
|
|
doStart(state);
|
|
|
}
|
|
|
|
|
@@ -174,16 +178,16 @@ public abstract class TransportMasterNodeAction<Request extends MasterNodeReques
|
|
|
|
|
|
@Override
|
|
|
public void onFailure(Exception t) {
|
|
|
- if (t instanceof Discovery.FailedToCommitClusterStateException
|
|
|
- || (t instanceof NotMasterException)) {
|
|
|
- logger.debug(() -> new ParameterizedMessage("master could not publish cluster state or stepped down before publishing action [{}], scheduling a retry", actionName), t);
|
|
|
+ if (t instanceof FailedToCommitClusterStateException || t instanceof NotMasterException) {
|
|
|
+ logger.debug(() -> new ParameterizedMessage("master could not publish cluster state or " +
|
|
|
+ "stepped down before publishing action [{}], scheduling a retry", actionName), t);
|
|
|
retry(t, masterChangePredicate);
|
|
|
} else {
|
|
|
listener.onFailure(t);
|
|
|
}
|
|
|
}
|
|
|
};
|
|
|
- threadPool.executor(executor).execute(new ActionRunnable(delegate) {
|
|
|
+ threadPool.executor(executor).execute(new ActionRunnable<Response>(delegate) {
|
|
|
@Override
|
|
|
protected void doRun() throws Exception {
|
|
|
masterOperation(task, request, clusterState, delegate);
|
|
@@ -204,7 +208,8 @@ public abstract class TransportMasterNodeAction<Request extends MasterNodeReques
|
|
|
Throwable cause = exp.unwrapCause();
|
|
|
if (cause instanceof ConnectTransportException) {
|
|
|
// we want to retry here a bit to see if a new master is elected
|
|
|
- logger.debug("connection exception while trying to forward request with action name [{}] to master node [{}], scheduling a retry. Error: [{}]",
|
|
|
+ logger.debug("connection exception while trying to forward request with action name [{}] to " +
|
|
|
+ "master node [{}], scheduling a retry. Error: [{}]",
|
|
|
actionName, nodes.getMasterNode(), exp.getDetailedMessage());
|
|
|
retry(cause, masterChangePredicate);
|
|
|
} else {
|
|
@@ -234,7 +239,8 @@ public abstract class TransportMasterNodeAction<Request extends MasterNodeReques
|
|
|
|
|
|
@Override
|
|
|
public void onTimeout(TimeValue timeout) {
|
|
|
- logger.debug(() -> new ParameterizedMessage("timed out while retrying [{}] after failure (timeout [{}])", actionName, timeout), failure);
|
|
|
+ logger.debug(() -> new ParameterizedMessage("timed out while retrying [{}] after failure (timeout [{}])",
|
|
|
+ actionName, timeout), failure);
|
|
|
listener.onFailure(new MasterNotDiscoveredException(failure));
|
|
|
}
|
|
|
}, statePredicate
|