|
@@ -43,8 +43,8 @@ import org.elasticsearch.cluster.routing.ShardRouting;
|
|
|
import org.elasticsearch.cluster.routing.ShardRoutingState;
|
|
|
import org.elasticsearch.cluster.routing.allocation.command.MoveAllocationCommand;
|
|
|
import org.elasticsearch.cluster.service.ClusterService;
|
|
|
-import org.elasticsearch.cluster.service.ClusterStateStatus;
|
|
|
import org.elasticsearch.cluster.service.ClusterServiceState;
|
|
|
+import org.elasticsearch.cluster.service.ClusterStateStatus;
|
|
|
import org.elasticsearch.common.Nullable;
|
|
|
import org.elasticsearch.common.Priority;
|
|
|
import org.elasticsearch.common.Strings;
|
|
@@ -187,13 +187,13 @@ public class DiscoveryWithServiceDisruptionsIT extends ESIntegTestCase {
|
|
|
}
|
|
|
|
|
|
private List<String> startCluster(int numberOfNodes, int minimumMasterNode, @Nullable int[] unicastHostsOrdinals) throws
|
|
|
- ExecutionException, InterruptedException {
|
|
|
+ ExecutionException, InterruptedException {
|
|
|
configureCluster(numberOfNodes, unicastHostsOrdinals, minimumMasterNode);
|
|
|
- List<String> nodes = internalCluster().startNodesAsync(numberOfNodes).get();
|
|
|
+ List<String> nodes = internalCluster().startNodes(numberOfNodes);
|
|
|
ensureStableCluster(numberOfNodes);
|
|
|
|
|
|
// TODO: this is a temporary solution so that nodes will not base their reaction to a partition based on previous successful results
|
|
|
- ZenPing zenPing = ((TestZenDiscovery)internalCluster().getInstance(Discovery.class)).getZenPing();
|
|
|
+ ZenPing zenPing = ((TestZenDiscovery) internalCluster().getInstance(Discovery.class)).getZenPing();
|
|
|
if (zenPing instanceof UnicastZenPing) {
|
|
|
((UnicastZenPing) zenPing).clearTemporalResponses();
|
|
|
}
|
|
@@ -201,16 +201,16 @@ public class DiscoveryWithServiceDisruptionsIT extends ESIntegTestCase {
|
|
|
}
|
|
|
|
|
|
static final Settings DEFAULT_SETTINGS = Settings.builder()
|
|
|
- .put(FaultDetection.PING_TIMEOUT_SETTING.getKey(), "1s") // for hitting simulated network failures quickly
|
|
|
- .put(FaultDetection.PING_RETRIES_SETTING.getKey(), "1") // for hitting simulated network failures quickly
|
|
|
- .put("discovery.zen.join_timeout", "10s") // still long to induce failures but to long so test won't time out
|
|
|
- .put(DiscoverySettings.PUBLISH_TIMEOUT_SETTING.getKey(), "1s") // <-- for hitting simulated network failures quickly
|
|
|
- .put(TcpTransport.TCP_CONNECT_TIMEOUT.getKey(), "10s") // Network delay disruption waits for the min between this
|
|
|
- // value and the time of disruption and does not recover immediately
|
|
|
- // when disruption is stop. We should make sure we recover faster
|
|
|
- // then the default of 30s, causing ensureGreen and friends to time out
|
|
|
+ .put(FaultDetection.PING_TIMEOUT_SETTING.getKey(), "1s") // for hitting simulated network failures quickly
|
|
|
+ .put(FaultDetection.PING_RETRIES_SETTING.getKey(), "1") // for hitting simulated network failures quickly
|
|
|
+ .put("discovery.zen.join_timeout", "10s") // still long to induce failures but to long so test won't time out
|
|
|
+ .put(DiscoverySettings.PUBLISH_TIMEOUT_SETTING.getKey(), "1s") // <-- for hitting simulated network failures quickly
|
|
|
+ .put(TcpTransport.TCP_CONNECT_TIMEOUT.getKey(), "10s") // Network delay disruption waits for the min between this
|
|
|
+ // value and the time of disruption and does not recover immediately
|
|
|
+ // when disruption is stop. We should make sure we recover faster
|
|
|
+ // then the default of 30s, causing ensureGreen and friends to time out
|
|
|
|
|
|
- .build();
|
|
|
+ .build();
|
|
|
|
|
|
@Override
|
|
|
protected Collection<Class<? extends Plugin>> nodePlugins() {
|
|
@@ -237,10 +237,10 @@ public class DiscoveryWithServiceDisruptionsIT extends ESIntegTestCase {
|
|
|
logger.info("---> configured unicast");
|
|
|
// TODO: Rarely use default settings form some of these
|
|
|
Settings nodeSettings = Settings.builder()
|
|
|
- .put(settings)
|
|
|
- .put(NodeEnvironment.MAX_LOCAL_STORAGE_NODES_SETTING.getKey(), numberOfNodes)
|
|
|
- .put(ElectMasterService.DISCOVERY_ZEN_MINIMUM_MASTER_NODES_SETTING.getKey(), minimumMasterNode)
|
|
|
- .build();
|
|
|
+ .put(settings)
|
|
|
+ .put(NodeEnvironment.MAX_LOCAL_STORAGE_NODES_SETTING.getKey(), numberOfNodes)
|
|
|
+ .put(ElectMasterService.DISCOVERY_ZEN_MINIMUM_MASTER_NODES_SETTING.getKey(), minimumMasterNode)
|
|
|
+ .build();
|
|
|
|
|
|
if (discoveryConfig == null) {
|
|
|
if (unicastHostsOrdinals == null) {
|
|
@@ -306,8 +306,8 @@ public class DiscoveryWithServiceDisruptionsIT extends ESIntegTestCase {
|
|
|
|
|
|
logger.info("--> reducing min master nodes to 2");
|
|
|
assertAcked(client().admin().cluster().prepareUpdateSettings()
|
|
|
- .setTransientSettings(Settings.builder().put(ElectMasterService.DISCOVERY_ZEN_MINIMUM_MASTER_NODES_SETTING.getKey(), 2))
|
|
|
- .get());
|
|
|
+ .setTransientSettings(Settings.builder().put(ElectMasterService.DISCOVERY_ZEN_MINIMUM_MASTER_NODES_SETTING.getKey(), 2))
|
|
|
+ .get());
|
|
|
|
|
|
String master = internalCluster().getMasterName();
|
|
|
String nonMaster = null;
|
|
@@ -334,8 +334,8 @@ public class DiscoveryWithServiceDisruptionsIT extends ESIntegTestCase {
|
|
|
|
|
|
// Makes sure that the get request can be executed on each node locally:
|
|
|
assertAcked(prepareCreate("test").setSettings(Settings.builder()
|
|
|
- .put(IndexMetaData.SETTING_NUMBER_OF_SHARDS, 1)
|
|
|
- .put(IndexMetaData.SETTING_NUMBER_OF_REPLICAS, 2)
|
|
|
+ .put(IndexMetaData.SETTING_NUMBER_OF_SHARDS, 1)
|
|
|
+ .put(IndexMetaData.SETTING_NUMBER_OF_REPLICAS, 2)
|
|
|
));
|
|
|
|
|
|
// Everything is stable now, it is now time to simulate evil...
|
|
@@ -376,7 +376,7 @@ public class DiscoveryWithServiceDisruptionsIT extends ESIntegTestCase {
|
|
|
}
|
|
|
if (!success) {
|
|
|
fail("node [" + node + "] has no master or has blocks, despite of being on the right side of the partition. State dump:\n"
|
|
|
- + nodeState);
|
|
|
+ + nodeState);
|
|
|
}
|
|
|
}
|
|
|
|
|
@@ -388,8 +388,8 @@ public class DiscoveryWithServiceDisruptionsIT extends ESIntegTestCase {
|
|
|
|
|
|
logger.info("Verify no master block with {} set to {}", DiscoverySettings.NO_MASTER_BLOCK_SETTING.getKey(), "all");
|
|
|
client().admin().cluster().prepareUpdateSettings()
|
|
|
- .setTransientSettings(Settings.builder().put(DiscoverySettings.NO_MASTER_BLOCK_SETTING.getKey(), "all"))
|
|
|
- .get();
|
|
|
+ .setTransientSettings(Settings.builder().put(DiscoverySettings.NO_MASTER_BLOCK_SETTING.getKey(), "all"))
|
|
|
+ .get();
|
|
|
|
|
|
networkDisruption.startDisrupting();
|
|
|
|
|
@@ -416,10 +416,10 @@ public class DiscoveryWithServiceDisruptionsIT extends ESIntegTestCase {
|
|
|
final List<String> nodes = startCluster(3);
|
|
|
|
|
|
assertAcked(prepareCreate("test")
|
|
|
- .setSettings(Settings.builder()
|
|
|
- .put(IndexMetaData.SETTING_NUMBER_OF_SHARDS, 1 + randomInt(2))
|
|
|
- .put(IndexMetaData.SETTING_NUMBER_OF_REPLICAS, randomInt(2))
|
|
|
- ));
|
|
|
+ .setSettings(Settings.builder()
|
|
|
+ .put(IndexMetaData.SETTING_NUMBER_OF_SHARDS, 1 + randomInt(2))
|
|
|
+ .put(IndexMetaData.SETTING_NUMBER_OF_REPLICAS, randomInt(2))
|
|
|
+ ));
|
|
|
|
|
|
ensureGreen();
|
|
|
String isolatedNode = internalCluster().getMasterName();
|
|
@@ -440,7 +440,7 @@ public class DiscoveryWithServiceDisruptionsIT extends ESIntegTestCase {
|
|
|
|
|
|
for (String node : nodes) {
|
|
|
ensureStableCluster(3, new TimeValue(DISRUPTION_HEALING_OVERHEAD.millis() + networkDisruption.expectedTimeToHeal().millis()),
|
|
|
- true, node);
|
|
|
+ true, node);
|
|
|
}
|
|
|
|
|
|
logger.info("issue a reroute");
|
|
@@ -468,8 +468,8 @@ public class DiscoveryWithServiceDisruptionsIT extends ESIntegTestCase {
|
|
|
}
|
|
|
} catch (AssertionError t) {
|
|
|
fail("failed comparing cluster state: " + t.getMessage() + "\n" +
|
|
|
- "--- cluster state of node [" + nodes.get(0) + "]: ---\n" + state +
|
|
|
- "\n--- cluster state [" + node + "]: ---\n" + nodeState);
|
|
|
+ "--- cluster state of node [" + nodes.get(0) + "]: ---\n" + state +
|
|
|
+ "\n--- cluster state [" + node + "]: ---\n" + nodeState);
|
|
|
}
|
|
|
|
|
|
}
|
|
@@ -482,7 +482,7 @@ public class DiscoveryWithServiceDisruptionsIT extends ESIntegTestCase {
|
|
|
* This test is a superset of tests run in the Jepsen test suite, with the exception of versioned updates
|
|
|
*/
|
|
|
@TestLogging("_root:DEBUG,org.elasticsearch.action.index:TRACE,org.elasticsearch.action.get:TRACE,discovery:TRACE,org.elasticsearch.cluster.service:TRACE,"
|
|
|
- + "org.elasticsearch.indices.recovery:TRACE,org.elasticsearch.indices.cluster:TRACE")
|
|
|
+ + "org.elasticsearch.indices.recovery:TRACE,org.elasticsearch.indices.cluster:TRACE")
|
|
|
public void testAckedIndexing() throws Exception {
|
|
|
|
|
|
final int seconds = !(TEST_NIGHTLY && rarely()) ? 1 : 5;
|
|
@@ -491,10 +491,10 @@ public class DiscoveryWithServiceDisruptionsIT extends ESIntegTestCase {
|
|
|
final List<String> nodes = startCluster(rarely() ? 5 : 3);
|
|
|
|
|
|
assertAcked(prepareCreate("test")
|
|
|
- .setSettings(Settings.builder()
|
|
|
- .put(IndexMetaData.SETTING_NUMBER_OF_SHARDS, 1 + randomInt(2))
|
|
|
- .put(IndexMetaData.SETTING_NUMBER_OF_REPLICAS, randomInt(2))
|
|
|
- ));
|
|
|
+ .setSettings(Settings.builder()
|
|
|
+ .put(IndexMetaData.SETTING_NUMBER_OF_SHARDS, 1 + randomInt(2))
|
|
|
+ .put(IndexMetaData.SETTING_NUMBER_OF_REPLICAS, randomInt(2))
|
|
|
+ ));
|
|
|
ensureGreen();
|
|
|
|
|
|
ServiceDisruptionScheme disruptionScheme = addRandomDisruptionScheme();
|
|
@@ -530,7 +530,7 @@ public class DiscoveryWithServiceDisruptionsIT extends ESIntegTestCase {
|
|
|
int shard = Math.floorMod(Murmur3HashFunction.hash(id), numPrimaries);
|
|
|
logger.trace("[{}] indexing id [{}] through node [{}] targeting shard [{}]", name, id, node, shard);
|
|
|
IndexResponse response =
|
|
|
- client.prepareIndex("test", "type", id).setSource("{}").setTimeout(timeout).get(timeout);
|
|
|
+ client.prepareIndex("test", "type", id).setSource("{}").setTimeout(timeout).get(timeout);
|
|
|
assertEquals(DocWriteResponse.Result.CREATED, response.getResult());
|
|
|
ackedDocs.put(id, node);
|
|
|
logger.trace("[{}] indexed id [{}] through node [{}]", name, id, node);
|
|
@@ -584,7 +584,7 @@ public class DiscoveryWithServiceDisruptionsIT extends ESIntegTestCase {
|
|
|
disruptionScheme.stopDisrupting();
|
|
|
for (String node : internalCluster().getNodeNames()) {
|
|
|
ensureStableCluster(nodes.size(), TimeValue.timeValueMillis(disruptionScheme.expectedTimeToHeal().millis() +
|
|
|
- DISRUPTION_HEALING_OVERHEAD.millis()), true, node);
|
|
|
+ DISRUPTION_HEALING_OVERHEAD.millis()), true, node);
|
|
|
}
|
|
|
ensureGreen("test");
|
|
|
|
|
@@ -594,7 +594,7 @@ public class DiscoveryWithServiceDisruptionsIT extends ESIntegTestCase {
|
|
|
logger.debug("validating through node [{}] ([{}] acked docs)", node, ackedDocs.size());
|
|
|
for (String id : ackedDocs.keySet()) {
|
|
|
assertTrue("doc [" + id + "] indexed via node [" + ackedDocs.get(id) + "] not found",
|
|
|
- client(node).prepareGet("test", "type", id).setPreference("_local").get().isExists());
|
|
|
+ client(node).prepareGet("test", "type", id).setPreference("_local").get().isExists());
|
|
|
}
|
|
|
} catch (AssertionError e) {
|
|
|
throw new AssertionError(e.getMessage() + " (checked via node [" + node + "]", e);
|
|
@@ -684,7 +684,7 @@ public class DiscoveryWithServiceDisruptionsIT extends ESIntegTestCase {
|
|
|
|
|
|
// Keeps track of the previous and current master when a master node transition took place on each node on the majority side:
|
|
|
final Map<String, List<Tuple<String, String>>> masters = Collections.synchronizedMap(new HashMap<String, List<Tuple<String,
|
|
|
- String>>>());
|
|
|
+ String>>>());
|
|
|
for (final String node : majoritySide) {
|
|
|
masters.put(node, new ArrayList<Tuple<String, String>>());
|
|
|
internalCluster().getInstance(ClusterService.class, node).add(new ClusterStateListener() {
|
|
@@ -694,7 +694,7 @@ public class DiscoveryWithServiceDisruptionsIT extends ESIntegTestCase {
|
|
|
DiscoveryNode currentMaster = event.state().nodes().getMasterNode();
|
|
|
if (!Objects.equals(previousMaster, currentMaster)) {
|
|
|
logger.info("node {} received new cluster state: {} \n and had previous cluster state: {}", node, event.state(),
|
|
|
- event.previousState());
|
|
|
+ event.previousState());
|
|
|
String previousMasterNodeName = previousMaster != null ? previousMaster.getName() : null;
|
|
|
String currentMasterNodeName = currentMaster != null ? currentMaster.getName() : null;
|
|
|
masters.get(node).add(new Tuple<>(previousMasterNodeName, currentMasterNodeName));
|
|
@@ -739,17 +739,17 @@ public class DiscoveryWithServiceDisruptionsIT extends ESIntegTestCase {
|
|
|
// The old master node will send this update + the cluster state where he is flagged as master to the other
|
|
|
// nodes that follow the new master. These nodes should ignore this update.
|
|
|
internalCluster().getInstance(ClusterService.class, oldMasterNode).submitStateUpdateTask("sneaky-update", new
|
|
|
- ClusterStateUpdateTask(Priority.IMMEDIATE) {
|
|
|
- @Override
|
|
|
- public ClusterState execute(ClusterState currentState) throws Exception {
|
|
|
- return ClusterState.builder(currentState).build();
|
|
|
- }
|
|
|
+ ClusterStateUpdateTask(Priority.IMMEDIATE) {
|
|
|
+ @Override
|
|
|
+ public ClusterState execute(ClusterState currentState) throws Exception {
|
|
|
+ return ClusterState.builder(currentState).build();
|
|
|
+ }
|
|
|
|
|
|
- @Override
|
|
|
- public void onFailure(String source, Exception e) {
|
|
|
- logger.warn((Supplier<?>) () -> new ParameterizedMessage("failure [{}]", source), e);
|
|
|
- }
|
|
|
- });
|
|
|
+ @Override
|
|
|
+ public void onFailure(String source, Exception e) {
|
|
|
+ logger.warn((Supplier<?>) () -> new ParameterizedMessage("failure [{}]", source), e);
|
|
|
+ }
|
|
|
+ });
|
|
|
|
|
|
// Save the new elected master node
|
|
|
final String newMasterNode = internalCluster().getMasterName(majoritySide.get(0));
|
|
@@ -769,15 +769,15 @@ public class DiscoveryWithServiceDisruptionsIT extends ESIntegTestCase {
|
|
|
String nodeName = entry.getKey();
|
|
|
List<Tuple<String, String>> recordedMasterTransition = entry.getValue();
|
|
|
assertThat("[" + nodeName + "] Each node should only record two master node transitions", recordedMasterTransition.size(),
|
|
|
- equalTo(2));
|
|
|
+ equalTo(2));
|
|
|
assertThat("[" + nodeName + "] First transition's previous master should be [null]", recordedMasterTransition.get(0).v1(),
|
|
|
- equalTo(oldMasterNode));
|
|
|
+ equalTo(oldMasterNode));
|
|
|
assertThat("[" + nodeName + "] First transition's current master should be [" + newMasterNode + "]", recordedMasterTransition
|
|
|
- .get(0).v2(), nullValue());
|
|
|
+ .get(0).v2(), nullValue());
|
|
|
assertThat("[" + nodeName + "] Second transition's previous master should be [null]", recordedMasterTransition.get(1).v1(),
|
|
|
- nullValue());
|
|
|
+ nullValue());
|
|
|
assertThat("[" + nodeName + "] Second transition's current master should be [" + newMasterNode + "]",
|
|
|
- recordedMasterTransition.get(1).v2(), equalTo(newMasterNode));
|
|
|
+ recordedMasterTransition.get(1).v2(), equalTo(newMasterNode));
|
|
|
}
|
|
|
}
|
|
|
|
|
@@ -789,11 +789,11 @@ public class DiscoveryWithServiceDisruptionsIT extends ESIntegTestCase {
|
|
|
List<String> nodes = startCluster(3);
|
|
|
|
|
|
assertAcked(prepareCreate("test")
|
|
|
- .setSettings(Settings.builder()
|
|
|
- .put(IndexMetaData.SETTING_NUMBER_OF_SHARDS, 1)
|
|
|
- .put(IndexMetaData.SETTING_NUMBER_OF_REPLICAS, 2)
|
|
|
- )
|
|
|
- .get());
|
|
|
+ .setSettings(Settings.builder()
|
|
|
+ .put(IndexMetaData.SETTING_NUMBER_OF_SHARDS, 1)
|
|
|
+ .put(IndexMetaData.SETTING_NUMBER_OF_REPLICAS, 2)
|
|
|
+ )
|
|
|
+ .get());
|
|
|
ensureGreen("test");
|
|
|
|
|
|
nodes = new ArrayList<>(nodes);
|
|
@@ -809,13 +809,13 @@ public class DiscoveryWithServiceDisruptionsIT extends ESIntegTestCase {
|
|
|
|
|
|
|
|
|
IndexResponse indexResponse = internalCluster().client(notIsolatedNode).prepareIndex("test", "type").setSource("field", "value")
|
|
|
- .get();
|
|
|
+ .get();
|
|
|
assertThat(indexResponse.getVersion(), equalTo(1L));
|
|
|
|
|
|
logger.info("Verifying if document exists via node[{}]", notIsolatedNode);
|
|
|
GetResponse getResponse = internalCluster().client(notIsolatedNode).prepareGet("test", "type", indexResponse.getId())
|
|
|
- .setPreference("_local")
|
|
|
- .get();
|
|
|
+ .setPreference("_local")
|
|
|
+ .get();
|
|
|
assertThat(getResponse.isExists(), is(true));
|
|
|
assertThat(getResponse.getVersion(), equalTo(1L));
|
|
|
assertThat(getResponse.getId(), equalTo(indexResponse.getId()));
|
|
@@ -828,8 +828,8 @@ public class DiscoveryWithServiceDisruptionsIT extends ESIntegTestCase {
|
|
|
for (String node : nodes) {
|
|
|
logger.info("Verifying if document exists after isolating node[{}] via node[{}]", isolatedNode, node);
|
|
|
getResponse = internalCluster().client(node).prepareGet("test", "type", indexResponse.getId())
|
|
|
- .setPreference("_local")
|
|
|
- .get();
|
|
|
+ .setPreference("_local")
|
|
|
+ .get();
|
|
|
assertThat(getResponse.isExists(), is(true));
|
|
|
assertThat(getResponse.getVersion(), equalTo(1L));
|
|
|
assertThat(getResponse.getId(), equalTo(indexResponse.getId()));
|
|
@@ -853,7 +853,7 @@ public class DiscoveryWithServiceDisruptionsIT extends ESIntegTestCase {
|
|
|
|
|
|
// Forcefully clean temporal response lists on all nodes. Otherwise the node in the unicast host list
|
|
|
// includes all the other nodes that have pinged it and the issue doesn't manifest
|
|
|
- ZenPing zenPing = ((TestZenDiscovery)internalCluster().getInstance(Discovery.class)).getZenPing();
|
|
|
+ ZenPing zenPing = ((TestZenDiscovery) internalCluster().getInstance(Discovery.class)).getZenPing();
|
|
|
if (zenPing instanceof UnicastZenPing) {
|
|
|
((UnicastZenPing) zenPing).clearTemporalResponses();
|
|
|
}
|
|
@@ -890,7 +890,7 @@ public class DiscoveryWithServiceDisruptionsIT extends ESIntegTestCase {
|
|
|
|
|
|
// Forcefully clean temporal response lists on all nodes. Otherwise the node in the unicast host list
|
|
|
// includes all the other nodes that have pinged it and the issue doesn't manifest
|
|
|
- ZenPing zenPing = ((TestZenDiscovery)internalCluster().getInstance(Discovery.class)).getZenPing();
|
|
|
+ ZenPing zenPing = ((TestZenDiscovery) internalCluster().getInstance(Discovery.class)).getZenPing();
|
|
|
if (zenPing instanceof UnicastZenPing) {
|
|
|
((UnicastZenPing) zenPing).clearTemporalResponses();
|
|
|
}
|
|
@@ -928,11 +928,11 @@ public class DiscoveryWithServiceDisruptionsIT extends ESIntegTestCase {
|
|
|
DiscoveryNodes discoveryNodes = internalCluster().getInstance(ClusterService.class, nonMasterNode).state().nodes();
|
|
|
|
|
|
TransportService masterTranspotService =
|
|
|
- internalCluster().getInstance(TransportService.class, discoveryNodes.getMasterNode().getName());
|
|
|
+ internalCluster().getInstance(TransportService.class, discoveryNodes.getMasterNode().getName());
|
|
|
|
|
|
logger.info("blocking requests from non master [{}] to master [{}]", nonMasterNode, masterNode);
|
|
|
MockTransportService nonMasterTransportService = (MockTransportService) internalCluster().getInstance(TransportService.class,
|
|
|
- nonMasterNode);
|
|
|
+ nonMasterNode);
|
|
|
nonMasterTransportService.addFailToSendNoConnectRule(masterTranspotService);
|
|
|
|
|
|
assertNoMaster(nonMasterNode);
|
|
@@ -951,10 +951,10 @@ public class DiscoveryWithServiceDisruptionsIT extends ESIntegTestCase {
|
|
|
logger.info("allowing requests from non master [{}] to master [{}], waiting for two join request", nonMasterNode, masterNode);
|
|
|
final CountDownLatch countDownLatch = new CountDownLatch(2);
|
|
|
nonMasterTransportService.addDelegate(masterTranspotService, new MockTransportService.DelegateTransport(nonMasterTransportService
|
|
|
- .original()) {
|
|
|
+ .original()) {
|
|
|
@Override
|
|
|
public void sendRequest(DiscoveryNode node, long requestId, String action, TransportRequest request, TransportRequestOptions
|
|
|
- options) throws IOException, TransportException {
|
|
|
+ options) throws IOException, TransportException {
|
|
|
if (action.equals(MembershipAction.DISCOVERY_JOIN_ACTION_NAME)) {
|
|
|
countDownLatch.countDown();
|
|
|
}
|
|
@@ -982,16 +982,16 @@ public class DiscoveryWithServiceDisruptionsIT extends ESIntegTestCase {
|
|
|
List<String> nonMasterNodes = nodes.stream().filter(node -> !node.equals(masterNode)).collect(Collectors.toList());
|
|
|
String nonMasterNode = randomFrom(nonMasterNodes);
|
|
|
assertAcked(prepareCreate("test")
|
|
|
- .setSettings(Settings.builder()
|
|
|
- .put(IndexMetaData.SETTING_NUMBER_OF_SHARDS, 3)
|
|
|
- .put(IndexMetaData.SETTING_NUMBER_OF_REPLICAS, 2)
|
|
|
- ));
|
|
|
+ .setSettings(Settings.builder()
|
|
|
+ .put(IndexMetaData.SETTING_NUMBER_OF_SHARDS, 3)
|
|
|
+ .put(IndexMetaData.SETTING_NUMBER_OF_REPLICAS, 2)
|
|
|
+ ));
|
|
|
ensureGreen();
|
|
|
String nonMasterNodeId = internalCluster().clusterService(nonMasterNode).localNode().getId();
|
|
|
|
|
|
// fail a random shard
|
|
|
ShardRouting failedShard =
|
|
|
- randomFrom(clusterService().state().getRoutingNodes().node(nonMasterNodeId).shardsWithState(ShardRoutingState.STARTED));
|
|
|
+ randomFrom(clusterService().state().getRoutingNodes().node(nonMasterNodeId).shardsWithState(ShardRoutingState.STARTED));
|
|
|
ShardStateAction service = internalCluster().getInstance(ShardStateAction.class, nonMasterNode);
|
|
|
CountDownLatch latch = new CountDownLatch(1);
|
|
|
AtomicBoolean success = new AtomicBoolean();
|
|
@@ -1006,20 +1006,20 @@ public class DiscoveryWithServiceDisruptionsIT extends ESIntegTestCase {
|
|
|
networkDisruption.startDisrupting();
|
|
|
|
|
|
service.localShardFailed(failedShard, "simulated", new CorruptIndexException("simulated", (String) null), new
|
|
|
- ShardStateAction.Listener() {
|
|
|
- @Override
|
|
|
- public void onSuccess() {
|
|
|
- success.set(true);
|
|
|
- latch.countDown();
|
|
|
- }
|
|
|
+ ShardStateAction.Listener() {
|
|
|
+ @Override
|
|
|
+ public void onSuccess() {
|
|
|
+ success.set(true);
|
|
|
+ latch.countDown();
|
|
|
+ }
|
|
|
|
|
|
- @Override
|
|
|
- public void onFailure(Exception e) {
|
|
|
- success.set(false);
|
|
|
- latch.countDown();
|
|
|
- assert false;
|
|
|
- }
|
|
|
- });
|
|
|
+ @Override
|
|
|
+ public void onFailure(Exception e) {
|
|
|
+ success.set(false);
|
|
|
+ latch.countDown();
|
|
|
+ assert false;
|
|
|
+ }
|
|
|
+ });
|
|
|
|
|
|
if (isolatedNode.equals(nonMasterNode)) {
|
|
|
assertNoMaster(nonMasterNode);
|
|
@@ -1051,11 +1051,7 @@ public class DiscoveryWithServiceDisruptionsIT extends ESIntegTestCase {
|
|
|
SlowClusterStateProcessing disruption = new SlowClusterStateProcessing(random(), 0, 0, 1000, 2000);
|
|
|
|
|
|
// don't wait for initial state, wat want to add the disruption while the cluster is forming..
|
|
|
- internalCluster().startNodesAsync(3,
|
|
|
- Settings.builder()
|
|
|
- .put(DiscoverySettings.INITIAL_STATE_TIMEOUT_SETTING.getKey(), "1ms")
|
|
|
- .put(DiscoverySettings.PUBLISH_TIMEOUT_SETTING.getKey(), "3s")
|
|
|
- .build()).get();
|
|
|
+ internalCluster().startNodes(3, Settings.builder().put(DiscoverySettings.PUBLISH_TIMEOUT_SETTING.getKey(), "3s").build());
|
|
|
|
|
|
logger.info("applying disruption while cluster is forming ...");
|
|
|
|
|
@@ -1084,7 +1080,7 @@ public class DiscoveryWithServiceDisruptionsIT extends ESIntegTestCase {
|
|
|
|
|
|
logger.info("blocking request from master [{}] to [{}]", masterNode, nonMasterNode);
|
|
|
MockTransportService masterTransportService = (MockTransportService) internalCluster().getInstance(TransportService.class,
|
|
|
- masterNode);
|
|
|
+ masterNode);
|
|
|
if (randomBoolean()) {
|
|
|
masterTransportService.addUnresponsiveRule(internalCluster().getInstance(TransportService.class, nonMasterNode));
|
|
|
} else {
|
|
@@ -1110,21 +1106,18 @@ public class DiscoveryWithServiceDisruptionsIT extends ESIntegTestCase {
|
|
|
public void testSearchWithRelocationAndSlowClusterStateProcessing() throws Exception {
|
|
|
// don't use DEFAULT settings (which can cause node disconnects on a slow CI machine)
|
|
|
configureCluster(Settings.EMPTY, 3, null, 1);
|
|
|
- InternalTestCluster.Async<String> masterNodeFuture = internalCluster().startMasterOnlyNodeAsync();
|
|
|
- InternalTestCluster.Async<String> node_1Future = internalCluster().startDataOnlyNodeAsync();
|
|
|
+ final String masterNode = internalCluster().startMasterOnlyNode();
|
|
|
+ final String node_1 = internalCluster().startDataOnlyNode();
|
|
|
|
|
|
- final String node_1 = node_1Future.get();
|
|
|
- final String masterNode = masterNodeFuture.get();
|
|
|
logger.info("--> creating index [test] with one shard and on replica");
|
|
|
assertAcked(prepareCreate("test").setSettings(
|
|
|
- Settings.builder().put(indexSettings())
|
|
|
- .put(IndexMetaData.SETTING_NUMBER_OF_SHARDS, 1)
|
|
|
- .put(IndexMetaData.SETTING_NUMBER_OF_REPLICAS, 0))
|
|
|
+ Settings.builder().put(indexSettings())
|
|
|
+ .put(IndexMetaData.SETTING_NUMBER_OF_SHARDS, 1)
|
|
|
+ .put(IndexMetaData.SETTING_NUMBER_OF_REPLICAS, 0))
|
|
|
);
|
|
|
ensureGreen("test");
|
|
|
|
|
|
- InternalTestCluster.Async<String> node_2Future = internalCluster().startDataOnlyNodeAsync();
|
|
|
- final String node_2 = node_2Future.get();
|
|
|
+ final String node_2 = internalCluster().startDataOnlyNode();
|
|
|
List<IndexRequestBuilder> indexRequestBuilderList = new ArrayList<>();
|
|
|
for (int i = 0; i < 100; i++) {
|
|
|
indexRequestBuilderList.add(client().prepareIndex().setIndex("test").setType("doc").setSource("{\"int_field\":1}"));
|
|
@@ -1137,7 +1130,7 @@ public class DiscoveryWithServiceDisruptionsIT extends ESIntegTestCase {
|
|
|
CountDownLatch beginRelocationLatch = new CountDownLatch(1);
|
|
|
CountDownLatch endRelocationLatch = new CountDownLatch(1);
|
|
|
transportServiceNode2.addTracer(new IndicesStoreIntegrationIT.ReclocationStartEndTracer(logger, beginRelocationLatch,
|
|
|
- endRelocationLatch));
|
|
|
+ endRelocationLatch));
|
|
|
internalCluster().client().admin().cluster().prepareReroute().add(new MoveAllocationCommand("test", 0, node_1, node_2)).get();
|
|
|
// wait for relocation to start
|
|
|
beginRelocationLatch.await();
|
|
@@ -1176,21 +1169,19 @@ public class DiscoveryWithServiceDisruptionsIT extends ESIntegTestCase {
|
|
|
*/
|
|
|
public void testIndicesDeleted() throws Exception {
|
|
|
final Settings settings = Settings.builder()
|
|
|
- .put(DEFAULT_SETTINGS)
|
|
|
- .put(DiscoverySettings.PUBLISH_TIMEOUT_SETTING.getKey(), "0s") // don't wait on isolated data node
|
|
|
- .put(DiscoverySettings.COMMIT_TIMEOUT_SETTING.getKey(), "30s") // wait till cluster state is committed
|
|
|
- .build();
|
|
|
+ .put(DEFAULT_SETTINGS)
|
|
|
+ .put(DiscoverySettings.PUBLISH_TIMEOUT_SETTING.getKey(), "0s") // don't wait on isolated data node
|
|
|
+ .put(DiscoverySettings.COMMIT_TIMEOUT_SETTING.getKey(), "30s") // wait till cluster state is committed
|
|
|
+ .build();
|
|
|
final String idxName = "test";
|
|
|
configureCluster(settings, 3, null, 2);
|
|
|
- InternalTestCluster.Async<List<String>> masterNodes = internalCluster().startMasterOnlyNodesAsync(2);
|
|
|
- InternalTestCluster.Async<String> dataNode = internalCluster().startDataOnlyNodeAsync();
|
|
|
- dataNode.get();
|
|
|
- final List<String> allMasterEligibleNodes = masterNodes.get();
|
|
|
+ final List<String> allMasterEligibleNodes = internalCluster().startMasterOnlyNodes(2);
|
|
|
+ final String dataNode = internalCluster().startDataOnlyNode();
|
|
|
ensureStableCluster(3);
|
|
|
assertAcked(prepareCreate("test"));
|
|
|
|
|
|
final String masterNode1 = internalCluster().getMasterName();
|
|
|
- NetworkDisruption networkDisruption = new NetworkDisruption(new TwoPartitions(masterNode1, dataNode.get()),
|
|
|
+ NetworkDisruption networkDisruption = new NetworkDisruption(new TwoPartitions(masterNode1, dataNode),
|
|
|
new NetworkUnresponsive());
|
|
|
internalCluster().setDisruptionScheme(networkDisruption);
|
|
|
networkDisruption.startDisrupting();
|
|
@@ -1202,7 +1193,7 @@ public class DiscoveryWithServiceDisruptionsIT extends ESIntegTestCase {
|
|
|
for (String masterNode : allMasterEligibleNodes) {
|
|
|
final ClusterServiceState masterState = internalCluster().clusterService(masterNode).clusterServiceState();
|
|
|
assertTrue("index not deleted on " + masterNode, masterState.getClusterState().metaData().hasIndex(idxName) == false &&
|
|
|
- masterState.getClusterStateStatus() == ClusterStateStatus.APPLIED);
|
|
|
+ masterState.getClusterStateStatus() == ClusterStateStatus.APPLIED);
|
|
|
}
|
|
|
});
|
|
|
internalCluster().restartNode(masterNode1, InternalTestCluster.EMPTY_CALLBACK);
|
|
@@ -1212,21 +1203,21 @@ public class DiscoveryWithServiceDisruptionsIT extends ESIntegTestCase {
|
|
|
|
|
|
public void testElectMasterWithLatestVersion() throws Exception {
|
|
|
configureCluster(3, null, 2);
|
|
|
- final Set<String> nodes = new HashSet<>(internalCluster().startNodesAsync(3).get());
|
|
|
+ final Set<String> nodes = new HashSet<>(internalCluster().startNodes(3));
|
|
|
ensureStableCluster(3);
|
|
|
ServiceDisruptionScheme isolateAllNodes = new NetworkDisruption(new NetworkDisruption.IsolateAllNodes(nodes), new NetworkDisconnect());
|
|
|
internalCluster().setDisruptionScheme(isolateAllNodes);
|
|
|
|
|
|
logger.info("--> forcing a complete election to make sure \"preferred\" master is elected");
|
|
|
isolateAllNodes.startDisrupting();
|
|
|
- for (String node: nodes) {
|
|
|
+ for (String node : nodes) {
|
|
|
assertNoMaster(node);
|
|
|
}
|
|
|
isolateAllNodes.stopDisrupting();
|
|
|
ensureStableCluster(3);
|
|
|
final String preferredMasterName = internalCluster().getMasterName();
|
|
|
final DiscoveryNode preferredMaster = internalCluster().clusterService(preferredMasterName).localNode();
|
|
|
- for (String node: nodes) {
|
|
|
+ for (String node : nodes) {
|
|
|
DiscoveryNode discoveryNode = internalCluster().clusterService(node).localNode();
|
|
|
assertThat(discoveryNode.getId(), greaterThanOrEqualTo(preferredMaster.getId()));
|
|
|
}
|
|
@@ -1252,7 +1243,7 @@ public class DiscoveryWithServiceDisruptionsIT extends ESIntegTestCase {
|
|
|
|
|
|
logger.info("--> forcing a complete election again");
|
|
|
isolateAllNodes.startDisrupting();
|
|
|
- for (String node: nodes) {
|
|
|
+ for (String node : nodes) {
|
|
|
assertNoMaster(node);
|
|
|
}
|
|
|
|
|
@@ -1298,10 +1289,17 @@ public class DiscoveryWithServiceDisruptionsIT extends ESIntegTestCase {
|
|
|
}
|
|
|
final NetworkLinkDisruptionType disruptionType;
|
|
|
switch (randomInt(2)) {
|
|
|
- case 0: disruptionType = new NetworkUnresponsive(); break;
|
|
|
- case 1: disruptionType = new NetworkDisconnect(); break;
|
|
|
- case 2: disruptionType = NetworkDelay.random(random()); break;
|
|
|
- default: throw new IllegalArgumentException();
|
|
|
+ case 0:
|
|
|
+ disruptionType = new NetworkUnresponsive();
|
|
|
+ break;
|
|
|
+ case 1:
|
|
|
+ disruptionType = new NetworkDisconnect();
|
|
|
+ break;
|
|
|
+ case 2:
|
|
|
+ disruptionType = NetworkDelay.random(random());
|
|
|
+ break;
|
|
|
+ default:
|
|
|
+ throw new IllegalArgumentException();
|
|
|
}
|
|
|
final ServiceDisruptionScheme scheme;
|
|
|
if (rarely()) {
|
|
@@ -1334,7 +1332,7 @@ public class DiscoveryWithServiceDisruptionsIT extends ESIntegTestCase {
|
|
|
if (expectedBlocks != null) {
|
|
|
for (ClusterBlockLevel level : expectedBlocks.levels()) {
|
|
|
assertTrue("node [" + node + "] does have level [" + level + "] in it's blocks", state.getBlocks().hasGlobalBlock
|
|
|
- (level));
|
|
|
+ (level));
|
|
|
}
|
|
|
}
|
|
|
}
|
|
@@ -1352,7 +1350,7 @@ public class DiscoveryWithServiceDisruptionsIT extends ESIntegTestCase {
|
|
|
}
|
|
|
logger.trace("[{}] master is [{}]", node, state.nodes().getMasterNode());
|
|
|
assertThat("node [" + node + "] still has [" + masterNode + "] as master",
|
|
|
- oldMasterNode, not(equalTo(masterNode)));
|
|
|
+ oldMasterNode, not(equalTo(masterNode)));
|
|
|
}
|
|
|
}, 10, TimeUnit.SECONDS);
|
|
|
}
|
|
@@ -1372,12 +1370,12 @@ public class DiscoveryWithServiceDisruptionsIT extends ESIntegTestCase {
|
|
|
private void assertDiscoveryCompleted(List<String> nodes) throws InterruptedException {
|
|
|
for (final String node : nodes) {
|
|
|
assertTrue(
|
|
|
- "node [" + node + "] is still joining master",
|
|
|
- awaitBusy(
|
|
|
- () -> !((ZenDiscovery) internalCluster().getInstance(Discovery.class, node)).joiningCluster(),
|
|
|
- 30,
|
|
|
- TimeUnit.SECONDS
|
|
|
- )
|
|
|
+ "node [" + node + "] is still joining master",
|
|
|
+ awaitBusy(
|
|
|
+ () -> !((ZenDiscovery) internalCluster().getInstance(Discovery.class, node)).joiningCluster(),
|
|
|
+ 30,
|
|
|
+ TimeUnit.SECONDS
|
|
|
+ )
|
|
|
);
|
|
|
}
|
|
|
}
|