|
|
@@ -26,10 +26,10 @@ import org.elasticsearch.action.ActionRunnable;
|
|
|
import org.elasticsearch.action.support.ActionFilters;
|
|
|
import org.elasticsearch.action.support.HandledTransportAction;
|
|
|
import org.elasticsearch.action.support.ThreadedActionListener;
|
|
|
-import org.elasticsearch.cluster.ClusterChangedEvent;
|
|
|
import org.elasticsearch.cluster.ClusterService;
|
|
|
import org.elasticsearch.cluster.ClusterState;
|
|
|
import org.elasticsearch.cluster.ClusterStateObserver;
|
|
|
+import org.elasticsearch.cluster.MasterNodeChangePredicate;
|
|
|
import org.elasticsearch.cluster.NotMasterException;
|
|
|
import org.elasticsearch.cluster.block.ClusterBlockException;
|
|
|
import org.elasticsearch.cluster.metadata.IndexNameExpressionResolver;
|
|
|
@@ -51,20 +51,6 @@ import java.util.function.Supplier;
|
|
|
* A base class for operations that needs to be performed on the master node.
|
|
|
*/
|
|
|
public abstract class TransportMasterNodeAction<Request extends MasterNodeRequest, Response extends ActionResponse> extends HandledTransportAction<Request, Response> {
|
|
|
- private static final ClusterStateObserver.ChangePredicate masterNodeChangedPredicate = new ClusterStateObserver.ChangePredicate() {
|
|
|
- @Override
|
|
|
- public boolean apply(ClusterState previousState, ClusterState.ClusterStateStatus previousStatus,
|
|
|
- ClusterState newState, ClusterState.ClusterStateStatus newStatus) {
|
|
|
- // The condition !newState.nodes().masterNodeId().equals(previousState.nodes().masterNodeId()) is not sufficient as the same master node might get reelected after a disruption.
|
|
|
- return newState.nodes().masterNodeId() != null && newState != previousState;
|
|
|
- }
|
|
|
-
|
|
|
- @Override
|
|
|
- public boolean apply(ClusterChangedEvent event) {
|
|
|
- return event.nodesDelta().masterNodeChanged();
|
|
|
- }
|
|
|
- };
|
|
|
-
|
|
|
protected final TransportService transportService;
|
|
|
protected final ClusterService clusterService;
|
|
|
|
|
|
@@ -164,7 +150,7 @@ public abstract class TransportMasterNodeAction<Request extends MasterNodeReques
|
|
|
if (t instanceof Discovery.FailedToCommitClusterStateException
|
|
|
|| (t instanceof NotMasterException)) {
|
|
|
logger.debug("master could not publish cluster state or stepped down before publishing action [{}], scheduling a retry", t, actionName);
|
|
|
- retry(t, masterNodeChangedPredicate);
|
|
|
+ retry(t, MasterNodeChangePredicate.INSTANCE);
|
|
|
} else {
|
|
|
listener.onFailure(t);
|
|
|
}
|
|
|
@@ -180,7 +166,7 @@ public abstract class TransportMasterNodeAction<Request extends MasterNodeReques
|
|
|
} else {
|
|
|
if (nodes.masterNode() == null) {
|
|
|
logger.debug("no known master node, scheduling a retry");
|
|
|
- retry(null, masterNodeChangedPredicate);
|
|
|
+ retry(null, MasterNodeChangePredicate.INSTANCE);
|
|
|
} else {
|
|
|
transportService.sendRequest(nodes.masterNode(), actionName, request, new ActionListenerResponseHandler<Response>(listener) {
|
|
|
@Override
|
|
|
@@ -195,7 +181,7 @@ public abstract class TransportMasterNodeAction<Request extends MasterNodeReques
|
|
|
// we want to retry here a bit to see if a new master is elected
|
|
|
logger.debug("connection exception while trying to forward request with action name [{}] to master node [{}], scheduling a retry. Error: [{}]",
|
|
|
actionName, nodes.masterNode(), exp.getDetailedMessage());
|
|
|
- retry(cause, masterNodeChangedPredicate);
|
|
|
+ retry(cause, MasterNodeChangePredicate.INSTANCE);
|
|
|
} else {
|
|
|
listener.onFailure(exp);
|
|
|
}
|