|
@@ -72,6 +72,7 @@ import java.util.concurrent.atomic.AtomicBoolean;
|
|
|
import java.util.concurrent.atomic.AtomicInteger;
|
|
|
import java.util.concurrent.atomic.AtomicReference;
|
|
|
import java.util.function.BiFunction;
|
|
|
+import java.util.function.Consumer;
|
|
|
import java.util.stream.Collectors;
|
|
|
|
|
|
import static org.elasticsearch.common.unit.TimeValue.timeValueSeconds;
|
|
@@ -207,7 +208,7 @@ public class ZenDiscovery extends AbstractLifecycleComponent implements Discover
|
|
|
joinThreadControl.start();
|
|
|
zenPing.start(this);
|
|
|
this.nodeJoinController = new NodeJoinController(clusterService, allocationService, electMaster, discoverySettings, settings);
|
|
|
- this.nodeRemovalExecutor = new NodeRemovalClusterStateTaskExecutor(allocationService, electMaster, this::rejoin, logger);
|
|
|
+ this.nodeRemovalExecutor = new NodeRemovalClusterStateTaskExecutor(allocationService, electMaster, this::submitRejoin, logger);
|
|
|
}
|
|
|
|
|
|
@Override
|
|
@@ -306,18 +307,7 @@ public class ZenDiscovery extends AbstractLifecycleComponent implements Discover
|
|
|
} catch (FailedToCommitClusterStateException t) {
|
|
|
// cluster service logs a WARN message
|
|
|
logger.debug("failed to publish cluster state version [{}] (not enough nodes acknowledged, min master nodes [{}])", clusterChangedEvent.state().version(), electMaster.minimumMasterNodes());
|
|
|
- clusterService.submitStateUpdateTask("zen-disco-failed-to-publish", new ClusterStateUpdateTask(Priority.IMMEDIATE) {
|
|
|
- @Override
|
|
|
- public ClusterState execute(ClusterState currentState) {
|
|
|
- return rejoin(currentState, "failed to publish to min_master_nodes");
|
|
|
- }
|
|
|
-
|
|
|
- @Override
|
|
|
- public void onFailure(String source, Exception e) {
|
|
|
- logger.error((Supplier<?>) () -> new ParameterizedMessage("unexpected failure during [{}]", source), e);
|
|
|
- }
|
|
|
-
|
|
|
- });
|
|
|
+ submitRejoin("zen-disco-failed-to-publish");
|
|
|
throw t;
|
|
|
}
|
|
|
|
|
@@ -505,12 +495,27 @@ public class ZenDiscovery extends AbstractLifecycleComponent implements Discover
|
|
|
}
|
|
|
}
|
|
|
|
|
|
+ private void submitRejoin(String source) {
|
|
|
+ clusterService.submitStateUpdateTask(source, new ClusterStateUpdateTask(Priority.IMMEDIATE) {
|
|
|
+ @Override
|
|
|
+ public ClusterState execute(ClusterState currentState) {
|
|
|
+ return rejoin(currentState, source);
|
|
|
+ }
|
|
|
+
|
|
|
+ @Override
|
|
|
+ public void onFailure(String source, Exception e) {
|
|
|
+ logger.error((Supplier<?>) () -> new ParameterizedMessage("unexpected failure during [{}]", source), e);
|
|
|
+ }
|
|
|
+
|
|
|
+ });
|
|
|
+ }
|
|
|
+
|
|
|
// visible for testing
|
|
|
static class NodeRemovalClusterStateTaskExecutor implements ClusterStateTaskExecutor<NodeRemovalClusterStateTaskExecutor.Task>, ClusterStateTaskListener {
|
|
|
|
|
|
private final AllocationService allocationService;
|
|
|
private final ElectMasterService electMasterService;
|
|
|
- private final BiFunction<ClusterState, String, ClusterState> rejoin;
|
|
|
+ private final Consumer<String> rejoin;
|
|
|
private final Logger logger;
|
|
|
|
|
|
static class Task {
|
|
@@ -540,7 +545,7 @@ public class ZenDiscovery extends AbstractLifecycleComponent implements Discover
|
|
|
NodeRemovalClusterStateTaskExecutor(
|
|
|
final AllocationService allocationService,
|
|
|
final ElectMasterService electMasterService,
|
|
|
- final BiFunction<ClusterState, String, ClusterState> rejoin,
|
|
|
+ final Consumer<String> rejoin,
|
|
|
final Logger logger) {
|
|
|
this.allocationService = allocationService;
|
|
|
this.electMasterService = electMasterService;
|
|
@@ -570,7 +575,8 @@ public class ZenDiscovery extends AbstractLifecycleComponent implements Discover
|
|
|
|
|
|
final BatchResult.Builder<Task> resultBuilder = BatchResult.<Task>builder().successes(tasks);
|
|
|
if (!electMasterService.hasEnoughMasterNodes(remainingNodesClusterState.nodes())) {
|
|
|
- return resultBuilder.build(rejoin.apply(remainingNodesClusterState, "not enough master nodes"));
|
|
|
+ rejoin.accept("not enough master nodes");
|
|
|
+ return resultBuilder.build(currentState);
|
|
|
} else {
|
|
|
return resultBuilder.build(allocationService.deassociateDeadNodes(remainingNodesClusterState, true, describeTasks(tasks)));
|
|
|
}
|