|
@@ -164,7 +164,7 @@ public class Coordinator extends AbstractLifecycleComponent implements Discovery
|
|
|
this.masterService = masterService;
|
|
|
this.allocationService = allocationService;
|
|
|
this.onJoinValidators = JoinTaskExecutor.addBuiltInJoinValidators(onJoinValidators);
|
|
|
- this.singleNodeDiscovery = DiscoveryModule.SINGLE_NODE_DISCOVERY_TYPE.equals(DiscoveryModule.DISCOVERY_TYPE_SETTING.get(settings));
|
|
|
+ this.singleNodeDiscovery = DiscoveryModule.isSingleNodeDiscovery(settings);
|
|
|
this.electionStrategy = electionStrategy;
|
|
|
this.joinHelper = new JoinHelper(settings, allocationService, masterService, transportService,
|
|
|
this::getCurrentTerm, this::getStateForMasterService, this::handleJoinRequest, this::joinLeaderInTerm, this.onJoinValidators,
|
|
@@ -1217,6 +1217,8 @@ public class Coordinator extends AbstractLifecycleComponent implements Discovery
|
|
|
private final AckListener ackListener;
|
|
|
private final ActionListener<Void> publishListener;
|
|
|
private final PublicationTransportHandler.PublicationContext publicationContext;
|
|
|
+
|
|
|
+ @Nullable // if using single-node discovery
|
|
|
private final Scheduler.ScheduledCancellable timeoutHandler;
|
|
|
private final Scheduler.Cancellable infoTimeoutHandler;
|
|
|
|
|
@@ -1260,7 +1262,7 @@ public class Coordinator extends AbstractLifecycleComponent implements Discovery
|
|
|
this.ackListener = ackListener;
|
|
|
this.publishListener = publishListener;
|
|
|
|
|
|
- this.timeoutHandler = transportService.getThreadPool().schedule(new Runnable() {
|
|
|
+ this.timeoutHandler = singleNodeDiscovery ? null : transportService.getThreadPool().schedule(new Runnable() {
|
|
|
@Override
|
|
|
public void run() {
|
|
|
synchronized (mutex) {
|
|
@@ -1328,8 +1330,7 @@ public class Coordinator extends AbstractLifecycleComponent implements Discovery
|
|
|
synchronized (mutex) {
|
|
|
removePublicationAndPossiblyBecomeCandidate("clusterApplier#onNewClusterState");
|
|
|
}
|
|
|
- timeoutHandler.cancel();
|
|
|
- infoTimeoutHandler.cancel();
|
|
|
+ cancelTimeoutHandlers();
|
|
|
ackListener.onNodeAck(getLocalNode(), e);
|
|
|
publishListener.onFailure(e);
|
|
|
}
|
|
@@ -1376,8 +1377,7 @@ public class Coordinator extends AbstractLifecycleComponent implements Discovery
|
|
|
lagDetector.startLagDetector(publishRequest.getAcceptedState().version());
|
|
|
logIncompleteNodes(Level.WARN);
|
|
|
}
|
|
|
- timeoutHandler.cancel();
|
|
|
- infoTimeoutHandler.cancel();
|
|
|
+ cancelTimeoutHandlers();
|
|
|
ackListener.onNodeAck(getLocalNode(), null);
|
|
|
publishListener.onResponse(null);
|
|
|
}
|
|
@@ -1388,8 +1388,7 @@ public class Coordinator extends AbstractLifecycleComponent implements Discovery
|
|
|
public void onFailure(Exception e) {
|
|
|
assert Thread.holdsLock(mutex) : "Coordinator mutex not held";
|
|
|
removePublicationAndPossiblyBecomeCandidate("Publication.onCompletion(false)");
|
|
|
- timeoutHandler.cancel();
|
|
|
- infoTimeoutHandler.cancel();
|
|
|
+ cancelTimeoutHandlers();
|
|
|
|
|
|
final FailedToCommitClusterStateException exception = new FailedToCommitClusterStateException("publication failed", e);
|
|
|
ackListener.onNodeAck(getLocalNode(), exception); // other nodes have acked, but not the master.
|
|
@@ -1398,6 +1397,13 @@ public class Coordinator extends AbstractLifecycleComponent implements Discovery
|
|
|
}, EsExecutors.newDirectExecutorService(), transportService.getThreadPool().getThreadContext());
|
|
|
}
|
|
|
|
|
|
+ private void cancelTimeoutHandlers() {
|
|
|
+ if (timeoutHandler != null) {
|
|
|
+ timeoutHandler.cancel();
|
|
|
+ }
|
|
|
+ infoTimeoutHandler.cancel();
|
|
|
+ }
|
|
|
+
|
|
|
private void handleAssociatedJoin(Join join) {
|
|
|
if (join.getTerm() == getCurrentTerm() && missingJoinVoteFrom(join.getSourceNode())) {
|
|
|
logger.trace("handling {}", join);
|