|
@@ -34,14 +34,10 @@ import org.elasticsearch.cluster.node.DiscoveryNode;
|
|
|
import org.elasticsearch.cluster.routing.RerouteService;
|
|
|
import org.elasticsearch.cluster.routing.allocation.AllocationService;
|
|
|
import org.elasticsearch.cluster.service.MasterService;
|
|
|
-import org.elasticsearch.common.Nullable;
|
|
|
import org.elasticsearch.common.Priority;
|
|
|
import org.elasticsearch.common.collect.Tuple;
|
|
|
import org.elasticsearch.common.io.stream.StreamInput;
|
|
|
-import org.elasticsearch.common.settings.Setting;
|
|
|
-import org.elasticsearch.common.settings.Settings;
|
|
|
import org.elasticsearch.common.unit.TimeValue;
|
|
|
-import org.elasticsearch.discovery.DiscoveryModule;
|
|
|
import org.elasticsearch.monitor.NodeHealthService;
|
|
|
import org.elasticsearch.monitor.StatusInfo;
|
|
|
import org.elasticsearch.threadpool.ThreadPool;
|
|
@@ -49,7 +45,6 @@ import org.elasticsearch.threadpool.ThreadPool.Names;
|
|
|
import org.elasticsearch.transport.TransportChannel;
|
|
|
import org.elasticsearch.transport.TransportException;
|
|
|
import org.elasticsearch.transport.TransportRequest;
|
|
|
-import org.elasticsearch.transport.TransportRequestOptions;
|
|
|
import org.elasticsearch.transport.TransportResponse;
|
|
|
import org.elasticsearch.transport.TransportResponse.Empty;
|
|
|
import org.elasticsearch.transport.TransportResponseHandler;
|
|
@@ -81,32 +76,22 @@ public class JoinHelper {
|
|
|
public static final String VALIDATE_JOIN_ACTION_NAME = "internal:cluster/coordination/join/validate";
|
|
|
public static final String START_JOIN_ACTION_NAME = "internal:cluster/coordination/start_join";
|
|
|
|
|
|
- // the timeout for each join attempt
|
|
|
- public static final Setting<TimeValue> JOIN_TIMEOUT_SETTING =
|
|
|
- Setting.timeSetting("cluster.join.timeout",
|
|
|
- TimeValue.timeValueMillis(60000), TimeValue.timeValueMillis(1), Setting.Property.NodeScope);
|
|
|
-
|
|
|
private final MasterService masterService;
|
|
|
private final TransportService transportService;
|
|
|
private final JoinTaskExecutor joinTaskExecutor;
|
|
|
-
|
|
|
- @Nullable // if using single-node discovery
|
|
|
- private final TimeValue joinTimeout;
|
|
|
private final NodeHealthService nodeHealthService;
|
|
|
|
|
|
private final Set<Tuple<DiscoveryNode, JoinRequest>> pendingOutgoingJoins = Collections.synchronizedSet(new HashSet<>());
|
|
|
+ private final AtomicReference<FailedJoinAttempt> lastFailedJoinAttempt = new AtomicReference<>();
|
|
|
|
|
|
- private AtomicReference<FailedJoinAttempt> lastFailedJoinAttempt = new AtomicReference<>();
|
|
|
-
|
|
|
- JoinHelper(Settings settings, AllocationService allocationService, MasterService masterService,
|
|
|
- TransportService transportService, LongSupplier currentTermSupplier, Supplier<ClusterState> currentStateSupplier,
|
|
|
+ JoinHelper(AllocationService allocationService, MasterService masterService, TransportService transportService,
|
|
|
+ LongSupplier currentTermSupplier, Supplier<ClusterState> currentStateSupplier,
|
|
|
BiConsumer<JoinRequest, JoinCallback> joinHandler, Function<StartJoinRequest, Join> joinLeaderInTerm,
|
|
|
Collection<BiConsumer<DiscoveryNode, ClusterState>> joinValidators, RerouteService rerouteService,
|
|
|
NodeHealthService nodeHealthService) {
|
|
|
this.masterService = masterService;
|
|
|
this.transportService = transportService;
|
|
|
this.nodeHealthService = nodeHealthService;
|
|
|
- this.joinTimeout = DiscoveryModule.isSingleNodeDiscovery(settings) ? null : JOIN_TIMEOUT_SETTING.get(settings);
|
|
|
this.joinTaskExecutor = new JoinTaskExecutor(allocationService, logger, rerouteService) {
|
|
|
|
|
|
@Override
|
|
@@ -249,7 +234,6 @@ public class JoinHelper {
|
|
|
if (pendingOutgoingJoins.add(dedupKey)) {
|
|
|
logger.debug("attempting to join {} with {}", destination, joinRequest);
|
|
|
transportService.sendRequest(destination, JOIN_ACTION_NAME, joinRequest,
|
|
|
- TransportRequestOptions.builder().withTimeout(joinTimeout).build(),
|
|
|
new TransportResponseHandler<Empty>() {
|
|
|
@Override
|
|
|
public Empty read(StreamInput in) {
|
|
@@ -309,10 +293,8 @@ public class JoinHelper {
|
|
|
}
|
|
|
|
|
|
void sendValidateJoinRequest(DiscoveryNode node, ClusterState state, ActionListener<TransportResponse.Empty> listener) {
|
|
|
- transportService.sendRequest(node, VALIDATE_JOIN_ACTION_NAME,
|
|
|
- new ValidateJoinRequest(state),
|
|
|
- TransportRequestOptions.builder().withTimeout(joinTimeout).build(),
|
|
|
- new ActionListenerResponseHandler<>(listener, i -> Empty.INSTANCE, ThreadPool.Names.GENERIC));
|
|
|
+ transportService.sendRequest(node, VALIDATE_JOIN_ACTION_NAME, new ValidateJoinRequest(state),
|
|
|
+ new ActionListenerResponseHandler<>(listener, i -> Empty.INSTANCE, ThreadPool.Names.GENERIC));
|
|
|
}
|
|
|
|
|
|
public interface JoinCallback {
|