|
@@ -67,6 +67,8 @@ import org.elasticsearch.indices.cluster.FakeThreadPoolMasterService;
|
|
|
import org.elasticsearch.test.ESTestCase;
|
|
|
import org.elasticsearch.test.disruption.DisruptableMockTransport;
|
|
|
import org.elasticsearch.test.disruption.DisruptableMockTransport.ConnectionStatus;
|
|
|
+import org.elasticsearch.threadpool.ThreadPool;
|
|
|
+import org.elasticsearch.transport.TransportInterceptor;
|
|
|
import org.elasticsearch.transport.TransportService;
|
|
|
import org.hamcrest.Matcher;
|
|
|
import org.hamcrest.core.IsCollectionContaining;
|
|
@@ -822,7 +824,8 @@ public class AbstractCoordinatorTestCase extends ESTestCase {
|
|
|
.putList(ClusterBootstrapService.INITIAL_MASTER_NODES_SETTING.getKey(),
|
|
|
ClusterBootstrapService.INITIAL_MASTER_NODES_SETTING.get(Settings.EMPTY)).build(); // suppress auto-bootstrap
|
|
|
transportService = mockTransport.createTransportService(
|
|
|
- settings, deterministicTaskQueue.getThreadPool(this::onNode), NOOP_TRANSPORT_INTERCEPTOR,
|
|
|
+ settings, deterministicTaskQueue.getThreadPool(this::onNode),
|
|
|
+ getTransportInterceptor(localNode, deterministicTaskQueue.getThreadPool(this::onNode)),
|
|
|
a -> localNode, null, emptySet());
|
|
|
masterService = new AckedFakeThreadPoolMasterService(localNode.getId(), "test",
|
|
|
runnable -> deterministicTaskQueue.scheduleNow(onNode(runnable)));
|
|
@@ -839,7 +842,7 @@ public class AbstractCoordinatorTestCase extends ESTestCase {
|
|
|
coordinator = new Coordinator("test_node", settings, clusterSettings, transportService, writableRegistry(),
|
|
|
allocationService, masterService, this::getPersistedState,
|
|
|
Cluster.this::provideSeedHosts, clusterApplierService, onJoinValidators, Randomness.get(), s -> {},
|
|
|
- ElectionStrategy.DEFAULT_INSTANCE);
|
|
|
+ getElectionStrategy());
|
|
|
masterService.setClusterStatePublisher(coordinator);
|
|
|
final GatewayService gatewayService = new GatewayService(settings, allocationService, clusterService,
|
|
|
deterministicTaskQueue.getThreadPool(this::onNode), null, coordinator);
|
|
@@ -1099,6 +1102,14 @@ public class AbstractCoordinatorTestCase extends ESTestCase {
|
|
|
}
|
|
|
}
|
|
|
|
|
|
+ protected TransportInterceptor getTransportInterceptor(DiscoveryNode localNode, ThreadPool threadPool) {
|
|
|
+ return NOOP_TRANSPORT_INTERCEPTOR;
|
|
|
+ }
|
|
|
+
|
|
|
+ protected ElectionStrategy getElectionStrategy() {
|
|
|
+ return ElectionStrategy.DEFAULT_INSTANCE;
|
|
|
+ }
|
|
|
+
|
|
|
public static final String NODE_ID_LOG_CONTEXT_KEY = "nodeId";
|
|
|
|
|
|
protected static String getNodeIdForLogContext(DiscoveryNode node) {
|