Browse Source

Make NetworkPartition disruption scheme configurable (#19534)

This commit separates the description of the links in the network that are to be disrupted from the failure that is to be applied to the links (disconnect/unresponsive/delay). Previously we had subclasses for the various kind of network disruption schemes combining on one hand failure mode (disconnect/unresponsive/delay) as well as the network links to cut (two partitions / bridge partitioning) into a single class.
Yannick Welsch 9 years ago
parent
commit
522b137097

+ 6 - 3
core/src/test/java/org/elasticsearch/action/support/master/IndexingMasterFailoverIT.java

@@ -27,8 +27,9 @@ import org.elasticsearch.discovery.zen.elect.ElectMasterService;
 import org.elasticsearch.discovery.zen.fd.FaultDetection;
 import org.elasticsearch.plugins.Plugin;
 import org.elasticsearch.test.ESIntegTestCase;
-import org.elasticsearch.test.disruption.NetworkDisconnectPartition;
-import org.elasticsearch.test.disruption.NetworkPartition;
+import org.elasticsearch.test.disruption.NetworkDisruption;
+import org.elasticsearch.test.disruption.NetworkDisruption.NetworkDisconnect;
+import org.elasticsearch.test.disruption.NetworkDisruption.TwoPartitions;
 import org.elasticsearch.test.transport.MockTransportService;
 
 import java.util.Arrays;
@@ -112,7 +113,9 @@ public class IndexingMasterFailoverIT extends ESIntegTestCase {
         Set<String> otherNodes = new HashSet<>(Arrays.asList(internalCluster().getNodeNames()));
         otherNodes.remove(master);
 
-        NetworkPartition partition = new NetworkDisconnectPartition(Collections.singleton(master), otherNodes, random());
+        NetworkDisruption partition = new NetworkDisruption(
+            new TwoPartitions(Collections.singleton(master), otherNodes),
+            new NetworkDisconnect());
         internalCluster().setDisruptionScheme(partition);
 
         logger.info("--> disrupting network");

+ 8 - 3
core/src/test/java/org/elasticsearch/cluster/MinimumMasterNodesIT.java

@@ -26,6 +26,7 @@ import org.elasticsearch.cluster.service.ClusterService;
 import org.elasticsearch.common.Priority;
 import org.elasticsearch.common.Strings;
 import org.elasticsearch.common.settings.Settings;
+import org.elasticsearch.common.unit.TimeValue;
 import org.elasticsearch.discovery.Discovery;
 import org.elasticsearch.discovery.DiscoverySettings;
 import org.elasticsearch.discovery.zen.ZenDiscovery;
@@ -36,7 +37,9 @@ import org.elasticsearch.plugins.Plugin;
 import org.elasticsearch.test.ESIntegTestCase;
 import org.elasticsearch.test.ESIntegTestCase.ClusterScope;
 import org.elasticsearch.test.ESIntegTestCase.Scope;
-import org.elasticsearch.test.disruption.NetworkDelaysPartition;
+import org.elasticsearch.test.disruption.NetworkDisruption;
+import org.elasticsearch.test.disruption.NetworkDisruption.NetworkDelay;
+import org.elasticsearch.test.disruption.NetworkDisruption.TwoPartitions;
 import org.elasticsearch.test.junit.annotations.TestLogging;
 import org.elasticsearch.test.transport.MockTransportService;
 
@@ -372,12 +375,14 @@ public class MinimumMasterNodesIT extends ESIntegTestCase {
         final String master = internalCluster().getMasterName();
         Set<String> otherNodes = new HashSet<>(Arrays.asList(internalCluster().getNodeNames()));
         otherNodes.remove(master);
-        NetworkDelaysPartition partition = new NetworkDelaysPartition(Collections.singleton(master), otherNodes, 60000, random());
+        NetworkDisruption partition = new NetworkDisruption(
+            new TwoPartitions(Collections.singleton(master), otherNodes),
+            new NetworkDelay(TimeValue.timeValueMinutes(1)));
         internalCluster().setDisruptionScheme(partition);
         partition.startDisrupting();
 
         final CountDownLatch latch = new CountDownLatch(1);
-        final AtomicReference<Throwable> failure = new AtomicReference<>();
+        final AtomicReference<Exception> failure = new AtomicReference<>();
         logger.debug("--> submitting for cluster state to be rejected");
         final ClusterService masterClusterService = internalCluster().clusterService(master);
         masterClusterService.submitStateUpdateTask("test", new ClusterStateUpdateTask() {

+ 7 - 5
core/src/test/java/org/elasticsearch/cluster/routing/PrimaryAllocationIT.java

@@ -28,17 +28,18 @@ import org.elasticsearch.cluster.routing.allocation.command.AllocateEmptyPrimary
 import org.elasticsearch.cluster.routing.allocation.command.AllocateStalePrimaryAllocationCommand;
 import org.elasticsearch.common.collect.ImmutableOpenIntMap;
 import org.elasticsearch.common.settings.Settings;
+import org.elasticsearch.common.util.set.Sets;
 import org.elasticsearch.gateway.GatewayAllocator;
 import org.elasticsearch.plugins.Plugin;
 import org.elasticsearch.test.ESIntegTestCase;
 import org.elasticsearch.test.InternalTestCluster;
-import org.elasticsearch.test.disruption.NetworkDisconnectPartition;
+import org.elasticsearch.test.disruption.NetworkDisruption;
+import org.elasticsearch.test.disruption.NetworkDisruption.NetworkDisconnect;
+import org.elasticsearch.test.disruption.NetworkDisruption.TwoPartitions;
 import org.elasticsearch.test.transport.MockTransportService;
 
-import java.util.Arrays;
 import java.util.Collection;
 import java.util.Collections;
-import java.util.HashSet;
 import java.util.List;
 import java.util.concurrent.ExecutionException;
 
@@ -85,8 +86,9 @@ public class PrimaryAllocationIT extends ESIntegTestCase {
             replicaNode = state.getRoutingNodes().node(shards.get(0).currentNodeId()).node().getName();
         }
 
-        NetworkDisconnectPartition partition = new NetworkDisconnectPartition(
-            new HashSet<>(Arrays.asList(master, replicaNode)), Collections.singleton(primaryNode), random());
+        NetworkDisruption partition = new NetworkDisruption(
+            new TwoPartitions(Sets.newHashSet(master, replicaNode), Collections.singleton(primaryNode)),
+            new NetworkDisconnect());
         internalCluster().setDisruptionScheme(partition);
         logger.info("--> partitioning node with primary shard from rest of cluster");
         partition.startDisrupting();

+ 72 - 56
core/src/test/java/org/elasticsearch/discovery/DiscoveryWithServiceDisruptionsIT.java

@@ -63,13 +63,16 @@ import org.elasticsearch.test.ESIntegTestCase.Scope;
 import org.elasticsearch.test.InternalTestCluster;
 import org.elasticsearch.test.discovery.ClusterDiscoveryConfiguration;
 import org.elasticsearch.test.disruption.BlockClusterStateProcessing;
-import org.elasticsearch.test.disruption.BridgePartition;
 import org.elasticsearch.test.disruption.IntermittentLongGCDisruption;
 import org.elasticsearch.test.disruption.LongGCDisruption;
-import org.elasticsearch.test.disruption.NetworkDelaysPartition;
-import org.elasticsearch.test.disruption.NetworkDisconnectPartition;
-import org.elasticsearch.test.disruption.NetworkPartition;
-import org.elasticsearch.test.disruption.NetworkUnresponsivePartition;
+import org.elasticsearch.test.disruption.NetworkDisruption;
+import org.elasticsearch.test.disruption.NetworkDisruption.Bridge;
+import org.elasticsearch.test.disruption.NetworkDisruption.NetworkDelay;
+import org.elasticsearch.test.disruption.NetworkDisruption.NetworkDisconnect;
+import org.elasticsearch.test.disruption.NetworkDisruption.NetworkLinkDisruptionType;
+import org.elasticsearch.test.disruption.NetworkDisruption.NetworkUnresponsive;
+import org.elasticsearch.test.disruption.NetworkDisruption.DisruptedLinks;
+import org.elasticsearch.test.disruption.NetworkDisruption.TwoPartitions;
 import org.elasticsearch.test.disruption.ServiceDisruptionScheme;
 import org.elasticsearch.test.disruption.SingleNodeDisruption;
 import org.elasticsearch.test.disruption.SlowClusterStateProcessing;
@@ -234,7 +237,8 @@ public class DiscoveryWithServiceDisruptionsIT extends ESIntegTestCase {
 
         // Simulate a network issue between the unlucky node and elected master node in both directions.
 
-        NetworkDisconnectPartition networkDisconnect = new NetworkDisconnectPartition(masterNode, unluckyNode, random());
+        NetworkDisruption networkDisconnect = new NetworkDisruption(new TwoPartitions(masterNode, unluckyNode),
+            new NetworkDisconnect());
         setDisruptionScheme(networkDisconnect);
         networkDisconnect.startDisrupting();
 
@@ -282,7 +286,9 @@ public class DiscoveryWithServiceDisruptionsIT extends ESIntegTestCase {
         }
 
         logger.info("--> isolating [{}]", nonMaster);
-        addRandomIsolation(nonMaster).startDisrupting();
+        TwoPartitions partitions = isolateNode(nonMaster);
+        NetworkDisruption networkDisruption = addRandomDisruptionType(partitions);
+        networkDisruption.startDisrupting();
 
         logger.info("--> waiting for master to remove it");
         ensureStableCluster(2, master);
@@ -305,15 +311,16 @@ public class DiscoveryWithServiceDisruptionsIT extends ESIntegTestCase {
         // (waiting for green here, because indexing / search in a yellow index is fine as long as no other nodes go down)
         ensureGreen("test");
 
-        NetworkPartition networkPartition = addRandomPartition();
+        TwoPartitions partitions = TwoPartitions.random(random(), internalCluster().getNodeNames());
+        NetworkDisruption networkDisruption = addRandomDisruptionType(partitions);
 
-        assertEquals(1, networkPartition.getMinoritySide().size());
-        final String isolatedNode = networkPartition.getMinoritySide().iterator().next();
-        assertEquals(2, networkPartition.getMajoritySide().size());
-        final String nonIsolatedNode = networkPartition.getMajoritySide().iterator().next();
+        assertEquals(1, partitions.getMinoritySide().size());
+        final String isolatedNode = partitions.getMinoritySide().iterator().next();
+        assertEquals(2, partitions.getMajoritySide().size());
+        final String nonIsolatedNode = partitions.getMajoritySide().iterator().next();
 
         // Simulate a network issue between the unlucky node and the rest of the cluster.
-        networkPartition.startDisrupting();
+        networkDisruption.startDisrupting();
 
 
         // The unlucky node must report *no* master node, since it can't connect to master and in fact it should
@@ -326,7 +333,7 @@ public class DiscoveryWithServiceDisruptionsIT extends ESIntegTestCase {
         logger.info("wait until elected master has been removed and a new 2 node cluster was from (via [{}])", isolatedNode);
         ensureStableCluster(2, nonIsolatedNode);
 
-        for (String node : networkPartition.getMajoritySide()) {
+        for (String node : partitions.getMajoritySide()) {
             ClusterState nodeState = getNodeClusterState(node);
             boolean success = true;
             if (nodeState.nodes().getMasterNode() == null) {
@@ -342,17 +349,17 @@ public class DiscoveryWithServiceDisruptionsIT extends ESIntegTestCase {
         }
 
 
-        networkPartition.stopDisrupting();
+        networkDisruption.stopDisrupting();
 
         // Wait until the master node sees al 3 nodes again.
-        ensureStableCluster(3, new TimeValue(DISRUPTION_HEALING_OVERHEAD.millis() + networkPartition.expectedTimeToHeal().millis()));
+        ensureStableCluster(3, new TimeValue(DISRUPTION_HEALING_OVERHEAD.millis() + networkDisruption.expectedTimeToHeal().millis()));
 
         logger.info("Verify no master block with {} set to {}", DiscoverySettings.NO_MASTER_BLOCK_SETTING.getKey(), "all");
         client().admin().cluster().prepareUpdateSettings()
                 .setTransientSettings(Settings.builder().put(DiscoverySettings.NO_MASTER_BLOCK_SETTING.getKey(), "all"))
                 .get();
 
-        networkPartition.startDisrupting();
+        networkDisruption.startDisrupting();
 
 
         // The unlucky node must report *no* master node, since it can't connect to master and in fact it should
@@ -384,10 +391,11 @@ public class DiscoveryWithServiceDisruptionsIT extends ESIntegTestCase {
 
         ensureGreen();
         String isolatedNode = internalCluster().getMasterName();
-        NetworkPartition networkPartition = addRandomIsolation(isolatedNode);
-        networkPartition.startDisrupting();
+        TwoPartitions partitions = isolateNode(isolatedNode);
+        NetworkDisruption networkDisruption = addRandomDisruptionType(partitions);
+        networkDisruption.startDisrupting();
 
-        String nonIsolatedNode = networkPartition.getMajoritySide().iterator().next();
+        String nonIsolatedNode = partitions.getMajoritySide().iterator().next();
 
         // make sure cluster reforms
         ensureStableCluster(2, nonIsolatedNode);
@@ -396,10 +404,10 @@ public class DiscoveryWithServiceDisruptionsIT extends ESIntegTestCase {
         assertNoMaster(isolatedNode, TimeValue.timeValueSeconds(40));
 
         // restore isolation
-        networkPartition.stopDisrupting();
+        networkDisruption.stopDisrupting();
 
         for (String node : nodes) {
-            ensureStableCluster(3, new TimeValue(DISRUPTION_HEALING_OVERHEAD.millis() + networkPartition.expectedTimeToHeal().millis()),
+            ensureStableCluster(3, new TimeValue(DISRUPTION_HEALING_OVERHEAD.millis() + networkDisruption.expectedTimeToHeal().millis()),
                     true, node);
         }
 
@@ -753,7 +761,8 @@ public class DiscoveryWithServiceDisruptionsIT extends ESIntegTestCase {
         String isolatedNode = nodes.get(0);
         String notIsolatedNode = nodes.get(1);
 
-        ServiceDisruptionScheme scheme = addRandomIsolation(isolatedNode);
+        TwoPartitions partitions = isolateNode(isolatedNode);
+        NetworkDisruption scheme = addRandomDisruptionType(partitions);
         scheme.startDisrupting();
         ensureStableCluster(2, notIsolatedNode);
         assertFalse(client(notIsolatedNode).admin().cluster().prepareHealth("test").setWaitForYellowStatus().get().isTimedOut());
@@ -811,7 +820,8 @@ public class DiscoveryWithServiceDisruptionsIT extends ESIntegTestCase {
         }
 
         // Simulate a network issue between the unlucky node and elected master node in both directions.
-        NetworkDisconnectPartition networkDisconnect = new NetworkDisconnectPartition(masterNode, isolatedNode, random());
+        NetworkDisruption networkDisconnect = new NetworkDisruption(new TwoPartitions(masterNode, isolatedNode),
+            new NetworkDisconnect());
         setDisruptionScheme(networkDisconnect);
         networkDisconnect.startDisrupting();
         // Wait until elected master has removed that the unlucky node...
@@ -848,7 +858,8 @@ public class DiscoveryWithServiceDisruptionsIT extends ESIntegTestCase {
         }
 
         // Simulate a network issue between the unicast target node and the rest of the cluster
-        NetworkDisconnectPartition networkDisconnect = new NetworkDisconnectPartition(unicastTargetSide, restOfClusterSide, random());
+        NetworkDisruption networkDisconnect = new NetworkDisruption(new TwoPartitions(unicastTargetSide, restOfClusterSide),
+            new NetworkDisconnect());
         setDisruptionScheme(networkDisconnect);
         networkDisconnect.startDisrupting();
         // Wait until elected master has removed that the unlucky node...
@@ -948,8 +959,9 @@ public class DiscoveryWithServiceDisruptionsIT extends ESIntegTestCase {
         AtomicBoolean success = new AtomicBoolean();
 
         String isolatedNode = randomBoolean() ? masterNode : nonMasterNode;
-        NetworkPartition networkPartition = addRandomIsolation(isolatedNode);
-        networkPartition.startDisrupting();
+        TwoPartitions partitions = isolateNode(isolatedNode);
+        NetworkDisruption networkDisruption = addRandomDisruptionType(partitions);
+        networkDisruption.startDisrupting();
 
         service.localShardFailed(failedShard, "simulated", new CorruptIndexException("simulated", (String) null), new
                 ShardStateAction.Listener() {
@@ -974,7 +986,7 @@ public class DiscoveryWithServiceDisruptionsIT extends ESIntegTestCase {
         }
 
         // heal the partition
-        networkPartition.removeAndEnsureHealthy(internalCluster());
+        networkDisruption.removeAndEnsureHealthy(internalCluster());
 
         // the cluster should stabilize
         ensureStableCluster(3);
@@ -1136,9 +1148,10 @@ public class DiscoveryWithServiceDisruptionsIT extends ESIntegTestCase {
         assertAcked(prepareCreate("test"));
 
         final String masterNode1 = internalCluster().getMasterName();
-        NetworkPartition networkPartition = new NetworkUnresponsivePartition(masterNode1, dataNode.get(), random());
-        internalCluster().setDisruptionScheme(networkPartition);
-        networkPartition.startDisrupting();
+        NetworkDisruption networkDisruption = new NetworkDisruption(new TwoPartitions(masterNode1, dataNode.get()),
+            new NetworkUnresponsive());
+        internalCluster().setDisruptionScheme(networkDisruption);
+        networkDisruption.startDisrupting();
         // We know this will time out due to the partition, we check manually below to not proceed until
         // the delete has been applied to the master node and the master eligible node.
         internalCluster().client(masterNode1).admin().indices().prepareDelete(idxName).setTimeout("0s").get();
@@ -1155,49 +1168,52 @@ public class DiscoveryWithServiceDisruptionsIT extends ESIntegTestCase {
         assertFalse(client().admin().indices().prepareExists(idxName).get().isExists());
     }
 
-    protected NetworkPartition addRandomPartition() {
-        NetworkPartition partition;
+    protected NetworkDisruption addRandomDisruptionType(TwoPartitions partitions) {
+        final NetworkLinkDisruptionType disruptionType;
         if (randomBoolean()) {
-            partition = new NetworkUnresponsivePartition(random());
+            disruptionType = new NetworkUnresponsive();
         } else {
-            partition = new NetworkDisconnectPartition(random());
+            disruptionType = new NetworkDisconnect();
         }
+        NetworkDisruption partition = new NetworkDisruption(partitions, disruptionType);
 
         setDisruptionScheme(partition);
 
         return partition;
     }
 
-    protected NetworkPartition addRandomIsolation(String isolatedNode) {
+    protected TwoPartitions isolateNode(String isolatedNode) {
         Set<String> side1 = new HashSet<>();
         Set<String> side2 = new HashSet<>(Arrays.asList(internalCluster().getNodeNames()));
         side1.add(isolatedNode);
         side2.remove(isolatedNode);
 
-        NetworkPartition partition;
-        if (randomBoolean()) {
-            partition = new NetworkUnresponsivePartition(side1, side2, random());
-        } else {
-            partition = new NetworkDisconnectPartition(side1, side2, random());
-        }
-
-        internalCluster().setDisruptionScheme(partition);
-
-        return partition;
+        return new TwoPartitions(side1, side2);
     }
 
     private ServiceDisruptionScheme addRandomDisruptionScheme() {
         // TODO: add partial partitions
-        List<ServiceDisruptionScheme> list = Arrays.asList(
-                new NetworkUnresponsivePartition(random()),
-                new NetworkDelaysPartition(random()),
-                new NetworkDisconnectPartition(random()),
-                new SlowClusterStateProcessing(random()),
-                new BridgePartition(random(), randomBoolean())
-        );
-        Collections.shuffle(list, random());
-        setDisruptionScheme(list.get(0));
-        return list.get(0);
+        final DisruptedLinks disruptedLinks;
+        if (randomBoolean()) {
+            disruptedLinks = TwoPartitions.random(random(), internalCluster().getNodeNames());
+        } else {
+            disruptedLinks = Bridge.random(random(), internalCluster().getNodeNames());
+        }
+        final NetworkLinkDisruptionType disruptionType;
+        switch (randomInt(2)) {
+            case 0: disruptionType = new NetworkUnresponsive(); break;
+            case 1: disruptionType = new NetworkDisconnect(); break;
+            case 2: disruptionType = NetworkDelay.random(random()); break;
+            default: throw new IllegalArgumentException();
+        }
+        final ServiceDisruptionScheme scheme;
+        if (rarely()) {
+            scheme = new SlowClusterStateProcessing(random());
+        } else {
+            scheme = new NetworkDisruption(disruptedLinks, disruptionType);
+        }
+        setDisruptionScheme(scheme);
+        return scheme;
     }
 
     private ClusterState getNodeClusterState(String node) {

+ 0 - 74
test/framework/src/main/java/org/elasticsearch/test/disruption/BridgePartition.java

@@ -1,74 +0,0 @@
-/*
- * Licensed to Elasticsearch under one or more contributor
- * license agreements. See the NOTICE file distributed with
- * this work for additional information regarding copyright
- * ownership. Elasticsearch licenses this file to you under
- * the Apache License, Version 2.0 (the "License"); you may
- * not use this file except in compliance with the License.
- * You may obtain a copy of the License at
- *
- *    http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing,
- * software distributed under the License is distributed on an
- * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
- * KIND, either express or implied.  See the License for the
- * specific language governing permissions and limitations
- * under the License.
- */
-package org.elasticsearch.test.disruption;
-
-import org.elasticsearch.common.unit.TimeValue;
-import org.elasticsearch.test.InternalTestCluster;
-import org.elasticsearch.test.transport.MockTransportService;
-
-import java.util.Random;
-
-import static org.elasticsearch.test.ESTestCase.randomFrom;
-
-/**
- * A partition that breaks the cluster into two groups of nodes. The two groups are fully isolated
- * with the exception of a single node that can see and be seen by all nodes in both groups.
- */
-public class BridgePartition extends NetworkPartition {
-
-    String bridgeNode;
-    final boolean unresponsive;
-
-    public BridgePartition(Random random, boolean unresponsive) {
-        super(random);
-        this.unresponsive = unresponsive;
-    }
-
-    @Override
-    public void applyToCluster(InternalTestCluster cluster) {
-        bridgeNode = randomFrom(random, cluster.getNodeNames());
-        this.cluster = cluster;
-        for (String node: cluster.getNodeNames()) {
-            if (node.equals(bridgeNode) == false) {
-                super.applyToNode(node, cluster);
-            }
-        }
-    }
-
-    @Override
-    public TimeValue expectedTimeToHeal() {
-        return TimeValue.timeValueSeconds(0);
-    }
-
-    @Override
-    void applyDisruption(MockTransportService transportService1, MockTransportService transportService2) {
-        if (unresponsive) {
-            transportService1.addUnresponsiveRule(transportService2);
-            transportService2.addUnresponsiveRule(transportService1);
-        } else {
-            transportService1.addFailToSendNoConnectRule(transportService2);
-            transportService2.addFailToSendNoConnectRule(transportService1);
-        }
-    }
-
-    @Override
-    protected String getPartitionDescription() {
-        return "bridge (super connected node: [" + bridgeNode + "], unresponsive [" + unresponsive + "])";
-    }
-}

+ 0 - 94
test/framework/src/main/java/org/elasticsearch/test/disruption/NetworkDelaysPartition.java

@@ -1,94 +0,0 @@
-/*
- * Licensed to Elasticsearch under one or more contributor
- * license agreements. See the NOTICE file distributed with
- * this work for additional information regarding copyright
- * ownership. Elasticsearch licenses this file to you under
- * the Apache License, Version 2.0 (the "License"); you may
- * not use this file except in compliance with the License.
- * You may obtain a copy of the License at
- *
- *    http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing,
- * software distributed under the License is distributed on an
- * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
- * KIND, either express or implied.  See the License for the
- * specific language governing permissions and limitations
- * under the License.
- */
-package org.elasticsearch.test.disruption;
-
-import org.elasticsearch.common.unit.TimeValue;
-import org.elasticsearch.test.transport.MockTransportService;
-
-import java.util.Random;
-import java.util.Set;
-
-public class NetworkDelaysPartition extends NetworkPartition {
-
-    static long DEFAULT_DELAY_MIN = 10000;
-    static long DEFAULT_DELAY_MAX = 90000;
-
-
-    final long delayMin;
-    final long delayMax;
-
-    TimeValue duration;
-
-    public NetworkDelaysPartition(Random random) {
-        this(random, DEFAULT_DELAY_MIN, DEFAULT_DELAY_MAX);
-    }
-
-    public NetworkDelaysPartition(Random random, long delayMin, long delayMax) {
-        super(random);
-        this.delayMin = delayMin;
-        this.delayMax = delayMax;
-    }
-
-    public NetworkDelaysPartition(String node1, String node2, Random random) {
-        this(node1, node2, DEFAULT_DELAY_MIN, DEFAULT_DELAY_MAX, random);
-    }
-
-    public NetworkDelaysPartition(String node1, String node2, long delayMin, long delayMax, Random random) {
-        super(node1, node2, random);
-        this.delayMin = delayMin;
-        this.delayMax = delayMax;
-    }
-
-    public NetworkDelaysPartition(Set<String> nodesSideOne, Set<String> nodesSideTwo, Random random) {
-        this(nodesSideOne, nodesSideTwo, DEFAULT_DELAY_MIN, DEFAULT_DELAY_MAX, random);
-    }
-
-    public NetworkDelaysPartition(Set<String> nodesSideOne, Set<String> nodesSideTwo, long delay, Random random) {
-        this(nodesSideOne, nodesSideTwo, delay, delay, random);
-    }
-
-    public NetworkDelaysPartition(Set<String> nodesSideOne, Set<String> nodesSideTwo, long delayMin, long delayMax, Random random) {
-        super(nodesSideOne, nodesSideTwo, random);
-        this.delayMin = delayMin;
-        this.delayMax = delayMax;
-
-    }
-
-    @Override
-    public synchronized void startDisrupting() {
-        duration = new TimeValue(delayMin == delayMax ? delayMin : delayMin + random.nextInt((int) (delayMax - delayMin)));
-        super.startDisrupting();
-    }
-
-    @Override
-    void applyDisruption(MockTransportService transportService1, MockTransportService transportService2) {
-        transportService1.addUnresponsiveRule(transportService1, duration);
-        transportService1.addUnresponsiveRule(transportService2, duration);
-    }
-
-    @Override
-    protected String getPartitionDescription() {
-        return "network delays for [" + duration + "]";
-    }
-
-    @Override
-    public TimeValue expectedTimeToHeal() {
-        return TimeValue.timeValueMillis(delayMax);
-    }
-}

+ 0 - 57
test/framework/src/main/java/org/elasticsearch/test/disruption/NetworkDisconnectPartition.java

@@ -1,57 +0,0 @@
-/*
- * Licensed to Elasticsearch under one or more contributor
- * license agreements. See the NOTICE file distributed with
- * this work for additional information regarding copyright
- * ownership. Elasticsearch licenses this file to you under
- * the Apache License, Version 2.0 (the "License"); you may
- * not use this file except in compliance with the License.
- * You may obtain a copy of the License at
- *
- *    http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing,
- * software distributed under the License is distributed on an
- * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
- * KIND, either express or implied.  See the License for the
- * specific language governing permissions and limitations
- * under the License.
- */
-package org.elasticsearch.test.disruption;
-
-import org.elasticsearch.common.unit.TimeValue;
-import org.elasticsearch.test.transport.MockTransportService;
-
-import java.util.Random;
-import java.util.Set;
-
-public class NetworkDisconnectPartition extends NetworkPartition {
-
-
-    public NetworkDisconnectPartition(Random random) {
-        super(random);
-    }
-
-    public NetworkDisconnectPartition(String node1, String node2, Random random) {
-        super(node1, node2, random);
-    }
-
-    public NetworkDisconnectPartition(Set<String> nodesSideOne, Set<String> nodesSideTwo, Random random) {
-        super(nodesSideOne, nodesSideTwo, random);
-    }
-
-    @Override
-    protected String getPartitionDescription() {
-        return "disconnected";
-    }
-
-    @Override
-    void applyDisruption(MockTransportService transportService1, MockTransportService transportService2) {
-        transportService1.addFailToSendNoConnectRule(transportService2);
-        transportService2.addFailToSendNoConnectRule(transportService1);
-    }
-
-    @Override
-    public TimeValue expectedTimeToHeal() {
-        return TimeValue.timeValueSeconds(0);
-    }
-}

+ 453 - 0
test/framework/src/main/java/org/elasticsearch/test/disruption/NetworkDisruption.java

@@ -0,0 +1,453 @@
+/*
+ * Licensed to Elasticsearch under one or more contributor
+ * license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright
+ * ownership. Elasticsearch licenses this file to you under
+ * the Apache License, Version 2.0 (the "License"); you may
+ * not use this file except in compliance with the License.
+ * You may obtain a copy of the License at
+ *
+ *    http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied.  See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+
+package org.elasticsearch.test.disruption;
+
+import com.carrotsearch.randomizedtesting.generators.RandomPicks;
+import org.elasticsearch.common.logging.ESLogger;
+import org.elasticsearch.common.logging.Loggers;
+import org.elasticsearch.common.unit.TimeValue;
+import org.elasticsearch.common.util.set.Sets;
+import org.elasticsearch.test.InternalTestCluster;
+import org.elasticsearch.test.transport.MockTransportService;
+import org.elasticsearch.transport.ConnectTransportException;
+import org.elasticsearch.transport.TransportService;
+
+import java.util.Collection;
+import java.util.Collections;
+import java.util.HashSet;
+import java.util.Random;
+import java.util.Set;
+import java.util.function.BiConsumer;
+
+import static org.junit.Assert.assertFalse;
+
+/**
+ * Network disruptions are modeled using two components:
+ * 1) the {@link DisruptedLinks} represents the links in the network that are to be disrupted
+ * 2) the {@link NetworkLinkDisruptionType} represents the failure mode that is to be applied to the links
+ */
+public class NetworkDisruption implements ServiceDisruptionScheme {
+
+    private final ESLogger logger = Loggers.getLogger(NetworkDisruption.class);
+
+    private final DisruptedLinks disruptedLinks;
+    private final NetworkLinkDisruptionType networkLinkDisruptionType;
+
+    protected volatile InternalTestCluster cluster;
+    protected volatile boolean activeDisruption = false;
+
+    public NetworkDisruption(DisruptedLinks disruptedLinks, NetworkLinkDisruptionType networkLinkDisruptionType) {
+        this.disruptedLinks = disruptedLinks;
+        this.networkLinkDisruptionType = networkLinkDisruptionType;
+    }
+
+    @Override
+    public void applyToCluster(InternalTestCluster cluster) {
+        this.cluster = cluster;
+    }
+
+    @Override
+    public void removeFromCluster(InternalTestCluster cluster) {
+        stopDisrupting();
+    }
+
+    @Override
+    public void removeAndEnsureHealthy(InternalTestCluster cluster) {
+        removeFromCluster(cluster);
+        ensureNodeCount(cluster);
+    }
+
+    protected void ensureNodeCount(InternalTestCluster cluster) {
+        assertFalse("cluster failed to form after disruption was healed", cluster.client().admin().cluster().prepareHealth()
+            .setWaitForNodes("" + cluster.size())
+            .setWaitForRelocatingShards(0)
+            .get().isTimedOut());
+    }
+
+    @Override
+    public synchronized void applyToNode(String node, InternalTestCluster cluster) {
+
+    }
+
+    @Override
+    public synchronized void removeFromNode(String node1, InternalTestCluster cluster) {
+        logger.info("stop disrupting node (disruption type: {}, disrupted links: {})", networkLinkDisruptionType, disruptedLinks);
+        applyToNodes(new String[]{ node1 }, cluster.getNodeNames(), networkLinkDisruptionType::removeDisruption);
+        applyToNodes(cluster.getNodeNames(), new String[]{ node1 }, networkLinkDisruptionType::removeDisruption);
+    }
+
+    @Override
+    public synchronized void testClusterClosed() {
+
+    }
+
+    @Override
+    public synchronized void startDisrupting() {
+        logger.info("start disrupting (disruption type: {}, disrupted links: {})", networkLinkDisruptionType, disruptedLinks);
+        applyToNodes(cluster.getNodeNames(), cluster.getNodeNames(), networkLinkDisruptionType::applyDisruption);
+        activeDisruption = true;
+    }
+
+    @Override
+    public synchronized void stopDisrupting() {
+        if (!activeDisruption) {
+            return;
+        }
+        logger.info("stop disrupting (disruption scheme: {}, disrupted links: {})", networkLinkDisruptionType, disruptedLinks);
+        applyToNodes(cluster.getNodeNames(), cluster.getNodeNames(), networkLinkDisruptionType::removeDisruption);
+        activeDisruption = false;
+    }
+
+    /**
+     * Applies action to all disrupted links between two sets of nodes.
+     */
+    private void applyToNodes(String[] nodes1, String[] nodes2, BiConsumer<MockTransportService, MockTransportService> consumer) {
+        for (String node1 : nodes1) {
+            if (disruptedLinks.nodes().contains(node1)) {
+                for (String node2 : nodes2) {
+                    if (disruptedLinks.nodes().contains(node2)) {
+                        if (node1.equals(node2) == false) {
+                            if (disruptedLinks.disrupt(node1, node2)) {
+                                consumer.accept(transport(node1), transport(node2));
+                            }
+                        }
+                    }
+                }
+            }
+        }
+    }
+
+    @Override
+    public TimeValue expectedTimeToHeal() {
+        return networkLinkDisruptionType.expectedTimeToHeal();
+    }
+
+    private MockTransportService transport(String node) {
+        return (MockTransportService) cluster.getInstance(TransportService.class, node);
+    }
+
+    /**
+     * Represents a set of nodes with connections between nodes that are to be disrupted
+     */
+    public abstract static class DisruptedLinks {
+        private final Set<String> nodes;
+
+        protected DisruptedLinks(Set<String>... nodeSets) {
+            Set<String> allNodes = new HashSet<>();
+            for (Set<String> nodeSet : nodeSets) {
+                allNodes.addAll(nodeSet);
+            }
+            this.nodes = allNodes;
+        }
+
+        /**
+         * Set of all nodes that can participate in disruptions
+         */
+        public Set<String> nodes() {
+            return nodes;
+        }
+
+        /**
+         * Returns true iff network should be disrupted between the two nodes
+         */
+        public abstract boolean disrupt(String node1, String node2);
+    }
+
+    /**
+     * Creates two partitions with symmetric failures
+     */
+    public static class TwoPartitions extends DisruptedLinks {
+
+        protected final Set<String> nodesSideOne;
+        protected final Set<String> nodesSideTwo;
+
+        public TwoPartitions(String node1, String node2) {
+            this(Collections.singleton(node1), Collections.singleton(node2));
+        }
+
+        public TwoPartitions(Set<String> nodesSideOne, Set<String> nodesSideTwo) {
+            super(nodesSideOne, nodesSideTwo);
+            this.nodesSideOne = nodesSideOne;
+            this.nodesSideTwo = nodesSideTwo;
+            assert nodesSideOne.isEmpty() == false;
+            assert nodesSideTwo.isEmpty() == false;
+            assert Sets.haveEmptyIntersection(nodesSideOne, nodesSideTwo);
+        }
+
+        public static TwoPartitions random(Random random, String... nodes) {
+            return random(random, Sets.newHashSet(nodes));
+        }
+
+        public static TwoPartitions random(Random random, Set<String> nodes) {
+            assert nodes.size() >= 2 : "two partitions topology requires at least 2 nodes";
+            Set<String> nodesSideOne = new HashSet<>();
+            Set<String> nodesSideTwo = new HashSet<>();
+            for (String node : nodes) {
+                if (nodesSideOne.isEmpty()) {
+                    nodesSideOne.add(node);
+                } else if (nodesSideTwo.isEmpty()) {
+                    nodesSideTwo.add(node);
+                } else if (random.nextBoolean()) {
+                    nodesSideOne.add(node);
+                } else {
+                    nodesSideTwo.add(node);
+                }
+            }
+            return new TwoPartitions(nodesSideOne, nodesSideTwo);
+        }
+
+        @Override
+        public boolean disrupt(String node1, String node2) {
+            if (nodesSideOne.contains(node1) && nodesSideTwo.contains(node2)) {
+                return true;
+            }
+            if (nodesSideOne.contains(node2) && nodesSideTwo.contains(node1)) {
+                return true;
+            }
+            return false;
+        }
+
+        public Set<String> getNodesSideOne() {
+            return Collections.unmodifiableSet(nodesSideOne);
+        }
+
+        public Set<String> getNodesSideTwo() {
+            return Collections.unmodifiableSet(nodesSideTwo);
+        }
+
+        public Collection<String> getMajoritySide() {
+            if (nodesSideOne.size() >= nodesSideTwo.size()) {
+                return getNodesSideOne();
+            } else {
+                return getNodesSideTwo();
+            }
+        }
+
+        public Collection<String> getMinoritySide() {
+            if (nodesSideOne.size() >= nodesSideTwo.size()) {
+                return getNodesSideTwo();
+            } else {
+                return getNodesSideOne();
+            }
+        }
+
+        @Override
+        public String toString() {
+            return "two partitions (partition 1: " + nodesSideOne + " and partition 2: " + nodesSideTwo + ")";
+        }
+    }
+
+    /**
+     * Creates two partitions with symmetric failures and a bridge node that can connect to both of the partitions
+     */
+    public static class Bridge extends DisruptedLinks {
+
+        private final String bridgeNode;
+        private final Set<String> nodesSideOne;
+        private final Set<String> nodesSideTwo;
+
+        public Bridge(String bridgeNode, Set<String> nodesSideOne, Set<String> nodesSideTwo) {
+            super(Collections.singleton(bridgeNode), nodesSideOne, nodesSideTwo);
+            this.bridgeNode = bridgeNode;
+            this.nodesSideOne = nodesSideOne;
+            this.nodesSideTwo = nodesSideTwo;
+            assert nodesSideOne.isEmpty() == false;
+            assert nodesSideTwo.isEmpty() == false;
+            assert Sets.haveEmptyIntersection(nodesSideOne, nodesSideTwo);
+            assert nodesSideOne.contains(bridgeNode) == false && nodesSideTwo.contains(bridgeNode) == false;
+        }
+
+        public static Bridge random(Random random, String... nodes) {
+            return random(random, Sets.newHashSet(nodes));
+        }
+
+        public static Bridge random(Random random, Set<String> nodes) {
+            assert nodes.size() >= 3 : "bridge topology requires at least 3 nodes";
+            String bridgeNode = RandomPicks.randomFrom(random, nodes);
+            Set<String> nodesSideOne = new HashSet<>();
+            Set<String> nodesSideTwo = new HashSet<>();
+            for (String node : nodes) {
+                if (node.equals(bridgeNode) == false) {
+                    if (nodesSideOne.isEmpty()) {
+                        nodesSideOne.add(node);
+                    } else if (nodesSideTwo.isEmpty()) {
+                        nodesSideTwo.add(node);
+                    } else if (random.nextBoolean()) {
+                        nodesSideOne.add(node);
+                    } else {
+                        nodesSideTwo.add(node);
+                    }
+                }
+            }
+            return new Bridge(bridgeNode, nodesSideOne, nodesSideTwo);
+        }
+
+        @Override
+        public boolean disrupt(String node1, String node2) {
+            if (nodesSideOne.contains(node1) && nodesSideTwo.contains(node2)) {
+                return true;
+            }
+            if (nodesSideOne.contains(node2) && nodesSideTwo.contains(node1)) {
+                return true;
+            }
+            return false;
+        }
+
+        public String getBridgeNode() {
+            return bridgeNode;
+        }
+
+        public Set<String> getNodesSideOne() {
+            return nodesSideOne;
+        }
+
+        public Set<String> getNodesSideTwo() {
+            return nodesSideTwo;
+        }
+
+        public String toString() {
+            return "bridge partition (super connected node: [" + bridgeNode + "], partition 1: " + nodesSideOne +
+                " and partition 2: " + nodesSideTwo + ")";
+        }
+    }
+
+    /**
+     * Abstract class representing various types of network disruptions. Instances of this class override the {@link #applyDisruption}
+     * method to apply their specific disruption type to requests that are send from a source to a target node.
+     */
+    public abstract static class NetworkLinkDisruptionType {
+
+        /**
+         * Applies network disruption for requests send from the node represented by the source transport service to the node represented
+         * by the target transport service.
+         *
+         * @param sourceTransportService source transport service from which requests are sent
+         * @param targetTransportService target transport service to which requests are sent
+         */
+        public abstract void applyDisruption(MockTransportService sourceTransportService, MockTransportService targetTransportService);
+
+        /**
+         * Removes network disruption that was added by {@link #applyDisruption}.
+         *
+         * @param sourceTransportService source transport service from which requests are sent
+         * @param targetTransportService target transport service to which requests are sent
+         */
+        public void removeDisruption(MockTransportService sourceTransportService, MockTransportService targetTransportService) {
+            sourceTransportService.clearRule(targetTransportService);
+        }
+
+        /**
+         * Returns expected time to heal after disruption has been removed. Defaults to instant healing.
+         */
+        public TimeValue expectedTimeToHeal() {
+            return TimeValue.timeValueMillis(0);
+        }
+    }
+
+    /**
+     * Simulates a network disconnect. Sending a request from source to target node throws a {@link ConnectTransportException}.
+     */
+    public static class NetworkDisconnect extends NetworkLinkDisruptionType {
+
+        @Override
+        public void applyDisruption(MockTransportService sourceTransportService, MockTransportService targetTransportService) {
+            sourceTransportService.addFailToSendNoConnectRule(targetTransportService);
+        }
+
+        @Override
+        public String toString() {
+            return "network disconnects";
+        }
+    }
+
+    /**
+     * Simulates an unresponsive target node by dropping requests sent from source to target node.
+     */
+    public static class NetworkUnresponsive extends NetworkLinkDisruptionType {
+
+        @Override
+        public void applyDisruption(MockTransportService sourceTransportService, MockTransportService targetTransportService) {
+            sourceTransportService.addUnresponsiveRule(targetTransportService);
+        }
+
+        @Override
+        public String toString() {
+            return "network unresponsive";
+        }
+    }
+
+    /**
+     * Simulates slow or congested network. Delivery of requests that are sent from source to target node are delayed by a configurable
+     * time amount.
+     */
+    public static class NetworkDelay extends NetworkLinkDisruptionType {
+
+        public static TimeValue DEFAULT_DELAY_MIN = TimeValue.timeValueSeconds(10);
+        public static TimeValue DEFAULT_DELAY_MAX = TimeValue.timeValueSeconds(90);
+
+        private final TimeValue delay;
+
+        /**
+         * Delays requests by a fixed time value.
+         *
+         * @param delay time to delay requests
+         */
+        public NetworkDelay(TimeValue delay) {
+            this.delay = delay;
+        }
+
+        /**
+         * Delays requests by a random but fixed time value between {@link #DEFAULT_DELAY_MIN} and {@link #DEFAULT_DELAY_MAX}.
+         *
+         * @param random instance to use for randomization of delay
+         */
+        public static NetworkDelay random(Random random) {
+            return random(random, DEFAULT_DELAY_MIN, DEFAULT_DELAY_MAX);
+        }
+
+        /**
+         * Delays requests by a random but fixed time value between delayMin and delayMax.
+         *
+         * @param random   instance to use for randomization of delay
+         * @param delayMin minimum delay
+         * @param delayMax maximum delay
+         */
+        public static NetworkDelay random(Random random, TimeValue delayMin, TimeValue delayMax) {
+            return new NetworkDelay(TimeValue.timeValueMillis(delayMin.millis() == delayMax.millis() ?
+                    delayMin.millis() :
+                    delayMin.millis() + random.nextInt((int) (delayMax.millis() - delayMin.millis()))));
+        }
+
+        @Override
+        public void applyDisruption(MockTransportService sourceTransportService, MockTransportService targetTransportService) {
+            sourceTransportService.addUnresponsiveRule(targetTransportService, delay);
+        }
+
+        @Override
+        public TimeValue expectedTimeToHeal() {
+            return delay;
+        }
+
+        @Override
+        public String toString() {
+            return "network delays for [" + delay + "]";
+        }
+    }
+}

+ 7 - 4
test/framework/src/main/java/org/elasticsearch/test/disruption/NetworkPartitionIT.java → test/framework/src/main/java/org/elasticsearch/test/disruption/NetworkDisruptionIT.java

@@ -22,12 +22,14 @@ package org.elasticsearch.test.disruption;
 import org.elasticsearch.plugins.Plugin;
 import org.elasticsearch.test.ESIntegTestCase;
 import org.elasticsearch.test.InternalTestCluster;
+import org.elasticsearch.test.disruption.NetworkDisruption.NetworkUnresponsive;
+import org.elasticsearch.test.disruption.NetworkDisruption.TwoPartitions;
 import org.elasticsearch.test.transport.MockTransportService;
 
 import java.io.IOException;
 import java.util.Collection;
 
-public class NetworkPartitionIT extends ESIntegTestCase {
+public class NetworkDisruptionIT extends ESIntegTestCase {
     @Override
     protected Collection<Class<? extends Plugin>> nodePlugins() {
         return pluginList(MockTransportService.TestPlugin.class);
@@ -36,9 +38,10 @@ public class NetworkPartitionIT extends ESIntegTestCase {
     public void testNetworkPartitionWithNodeShutdown() throws IOException {
         internalCluster().ensureAtLeastNumDataNodes(2);
         String[] nodeNames = internalCluster().getNodeNames();
-        NetworkPartition networkPartition = new NetworkUnresponsivePartition(nodeNames[0], nodeNames[1], random());
-        internalCluster().setDisruptionScheme(networkPartition);
-        networkPartition.startDisrupting();
+        NetworkDisruption networkDisruption = new NetworkDisruption(new TwoPartitions(nodeNames[0], nodeNames[1]),
+            new NetworkUnresponsive());
+        internalCluster().setDisruptionScheme(networkDisruption);
+        networkDisruption.startDisrupting();
         internalCluster().stopRandomNode(InternalTestCluster.nameFilter(nodeNames[0]));
         internalCluster().clearDisruptionScheme();
     }

+ 102 - 0
test/framework/src/main/java/org/elasticsearch/test/disruption/NetworkDisruptionTests.java

@@ -0,0 +1,102 @@
+/*
+ * Licensed to Elasticsearch under one or more contributor
+ * license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright
+ * ownership. Elasticsearch licenses this file to you under
+ * the Apache License, Version 2.0 (the "License"); you may
+ * not use this file except in compliance with the License.
+ * You may obtain a copy of the License at
+ *
+ *    http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied.  See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+
+package org.elasticsearch.test.disruption;
+
+import org.elasticsearch.test.ESTestCase;
+import org.elasticsearch.test.disruption.NetworkDisruption.Bridge;
+import org.elasticsearch.test.disruption.NetworkDisruption.TwoPartitions;
+
+import java.util.HashSet;
+import java.util.Set;
+
+public class NetworkDisruptionTests extends ESTestCase {
+
+    public void testTwoPartitions() {
+        Set<String> partition1 = generateRandomStringSet(1, 10);
+        Set<String> partition2 = generateRandomStringSet(1, 10);
+        TwoPartitions topology = new TwoPartitions(partition1, partition2);
+        checkTwoPartitions(topology, partition1, partition2);
+    }
+
+    public void testRandomTwoPartitions() {
+        TwoPartitions topology = TwoPartitions.random(random(), generateRandomStringSet(2, 20));
+        Set<String> partition1 = topology.getNodesSideOne();
+        Set<String> partition2 = topology.getNodesSideTwo();
+        checkTwoPartitions(topology, partition1, partition2);
+    }
+
+    private void checkTwoPartitions(TwoPartitions topology, Set<String> partition1, Set<String> partition2) {
+        for (int i = 0; i < 10; i++) {
+            assertTrue(topology.disrupt(randomFrom(partition1), randomFrom(partition2)));
+            assertTrue(topology.disrupt(randomFrom(partition2), randomFrom(partition1)));
+            assertFalse(topology.disrupt(randomFrom(partition1), randomFrom(partition1)));
+            assertFalse(topology.disrupt(randomFrom(partition2), randomFrom(partition2)));
+            assertFalse(topology.disrupt(randomAsciiOfLength(10), randomFrom(partition1)));
+            assertFalse(topology.disrupt(randomAsciiOfLength(10), randomFrom(partition2)));
+            assertFalse(topology.disrupt(randomFrom(partition1), randomAsciiOfLength(10)));
+            assertFalse(topology.disrupt(randomFrom(partition2), randomAsciiOfLength(10)));
+        }
+        assertTrue(topology.getMajoritySide().size() >= topology.getMinoritySide().size());
+    }
+
+    public void testBridge() {
+        Set<String> partition1 = generateRandomStringSet(1, 10);
+        Set<String> partition2 = generateRandomStringSet(1, 10);
+        String bridgeNode = randomAsciiOfLength(10);
+        Bridge topology = new Bridge(bridgeNode, partition1, partition2);
+        checkBridge(topology, bridgeNode, partition1, partition2);
+    }
+
+    public void testRandomBridge() {
+        Bridge topology = Bridge.random(random(), generateRandomStringSet(3, 20));
+        String bridgeNode = topology.getBridgeNode();
+        Set<String> partition1 = topology.getNodesSideOne();
+        Set<String> partition2 = topology.getNodesSideTwo();
+        checkBridge(topology, bridgeNode, partition1, partition2);
+    }
+
+    private void checkBridge(Bridge topology, String bridgeNode, Set<String> partition1, Set<String> partition2) {
+        for (int i = 0; i < 10; i++) {
+            assertTrue(topology.disrupt(randomFrom(partition1), randomFrom(partition2)));
+            assertTrue(topology.disrupt(randomFrom(partition2), randomFrom(partition1)));
+            assertFalse(topology.disrupt(randomFrom(partition1), randomFrom(partition1)));
+            assertFalse(topology.disrupt(randomFrom(partition1), bridgeNode));
+            assertFalse(topology.disrupt(bridgeNode, randomFrom(partition1)));
+            assertFalse(topology.disrupt(randomFrom(partition2), randomFrom(partition2)));
+            assertFalse(topology.disrupt(randomFrom(partition2), bridgeNode));
+            assertFalse(topology.disrupt(bridgeNode, randomFrom(partition2)));
+            assertFalse(topology.disrupt(randomAsciiOfLength(10), randomFrom(partition1)));
+            assertFalse(topology.disrupt(randomAsciiOfLength(10), randomFrom(partition2)));
+            assertFalse(topology.disrupt(randomAsciiOfLength(10), bridgeNode));
+            assertFalse(topology.disrupt(randomFrom(partition1), randomAsciiOfLength(10)));
+            assertFalse(topology.disrupt(randomFrom(partition2), randomAsciiOfLength(10)));
+            assertFalse(topology.disrupt(bridgeNode, randomAsciiOfLength(10)));
+        }
+    }
+
+    private Set<String> generateRandomStringSet(int minSize, int maxSize) {
+        assert maxSize >= minSize;
+        Set<String> result = new HashSet<>();
+        for (int i = 0; i < minSize + randomInt(maxSize - minSize); i++) {
+            result.add(randomAsciiOfLength(10));
+        }
+        return result;
+    }
+}

+ 0 - 204
test/framework/src/main/java/org/elasticsearch/test/disruption/NetworkPartition.java

@@ -1,204 +0,0 @@
-/*
- * Licensed to Elasticsearch under one or more contributor
- * license agreements. See the NOTICE file distributed with
- * this work for additional information regarding copyright
- * ownership. Elasticsearch licenses this file to you under
- * the Apache License, Version 2.0 (the "License"); you may
- * not use this file except in compliance with the License.
- * You may obtain a copy of the License at
- *
- *    http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing,
- * software distributed under the License is distributed on an
- * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
- * KIND, either express or implied.  See the License for the
- * specific language governing permissions and limitations
- * under the License.
- */
-package org.elasticsearch.test.disruption;
-
-import org.elasticsearch.common.logging.ESLogger;
-import org.elasticsearch.common.logging.Loggers;
-import org.elasticsearch.test.InternalTestCluster;
-import org.elasticsearch.test.transport.MockTransportService;
-import org.elasticsearch.transport.TransportService;
-
-import java.util.Collection;
-import java.util.Collections;
-import java.util.HashSet;
-import java.util.Random;
-import java.util.Set;
-
-import static org.junit.Assert.assertFalse;
-
-public abstract class NetworkPartition implements ServiceDisruptionScheme {
-
-    protected final ESLogger logger = Loggers.getLogger(getClass());
-
-    final Set<String> nodesSideOne;
-    final Set<String> nodesSideTwo;
-    volatile boolean autoExpand;
-    protected final Random random;
-    protected volatile InternalTestCluster cluster;
-    protected volatile boolean activeDisruption = false;
-
-
-    public NetworkPartition(Random random) {
-        this.random = new Random(random.nextLong());
-        nodesSideOne = new HashSet<>();
-        nodesSideTwo = new HashSet<>();
-        autoExpand = true;
-    }
-
-    public NetworkPartition(String node1, String node2, Random random) {
-        this(random);
-        nodesSideOne.add(node1);
-        nodesSideTwo.add(node2);
-        autoExpand = false;
-    }
-
-    public NetworkPartition(Set<String> nodesSideOne, Set<String> nodesSideTwo, Random random) {
-        this(random);
-        this.nodesSideOne.addAll(nodesSideOne);
-        this.nodesSideTwo.addAll(nodesSideTwo);
-        autoExpand = false;
-    }
-
-
-    public Collection<String> getNodesSideOne() {
-        return Collections.unmodifiableCollection(nodesSideOne);
-    }
-
-    public Collection<String> getNodesSideTwo() {
-        return Collections.unmodifiableCollection(nodesSideTwo);
-    }
-
-    public Collection<String> getMajoritySide() {
-        if (nodesSideOne.size() >= nodesSideTwo.size()) {
-            return getNodesSideOne();
-        } else {
-            return getNodesSideTwo();
-        }
-    }
-
-    public Collection<String> getMinoritySide() {
-        if (nodesSideOne.size() >= nodesSideTwo.size()) {
-            return getNodesSideTwo();
-        } else {
-            return getNodesSideOne();
-        }
-    }
-
-    @Override
-    public void applyToCluster(InternalTestCluster cluster) {
-        this.cluster = cluster;
-        if (autoExpand) {
-            for (String node : cluster.getNodeNames()) {
-                applyToNode(node, cluster);
-            }
-        }
-    }
-
-    @Override
-    public void removeFromCluster(InternalTestCluster cluster) {
-        stopDisrupting();
-    }
-
-    @Override
-    public void removeAndEnsureHealthy(InternalTestCluster cluster) {
-        removeFromCluster(cluster);
-        ensureNodeCount(cluster);
-    }
-
-    protected void ensureNodeCount(InternalTestCluster cluster) {
-        assertFalse("cluster failed to form after disruption was healed", cluster.client().admin().cluster().prepareHealth()
-                .setWaitForNodes("" + cluster.size())
-                .setWaitForRelocatingShards(0)
-                .get().isTimedOut());
-    }
-
-    @Override
-    public synchronized void applyToNode(String node, InternalTestCluster cluster) {
-        if (!autoExpand || nodesSideOne.contains(node) || nodesSideTwo.contains(node)) {
-            return;
-        }
-        if (nodesSideOne.isEmpty()) {
-            nodesSideOne.add(node);
-        } else if (nodesSideTwo.isEmpty()) {
-            nodesSideTwo.add(node);
-        } else if (random.nextBoolean()) {
-            nodesSideOne.add(node);
-        } else {
-            nodesSideTwo.add(node);
-        }
-    }
-
-    @Override
-    public synchronized void removeFromNode(String node, InternalTestCluster cluster) {
-        MockTransportService transportService = (MockTransportService) cluster.getInstance(TransportService.class, node);
-        Set<String> otherSideNodes;
-        if (nodesSideOne.contains(node)) {
-            otherSideNodes = nodesSideTwo;
-            nodesSideOne.remove(node);
-        } else if (nodesSideTwo.contains(node)) {
-            otherSideNodes = nodesSideOne;
-            nodesSideTwo.remove(node);
-        } else {
-            return;
-        }
-        for (String node2 : otherSideNodes) {
-            MockTransportService transportService2 = (MockTransportService) cluster.getInstance(TransportService.class, node2);
-            removeDisruption(transportService, transportService2);
-        }
-    }
-
-    @Override
-    public synchronized void testClusterClosed() {
-
-    }
-
-    protected abstract String getPartitionDescription();
-
-    @Override
-    public synchronized void startDisrupting() {
-        if (nodesSideOne.size() == 0 || nodesSideTwo.size() == 0) {
-            return;
-        }
-        logger.info("nodes {} will be partitioned from {}. partition type [{}]", nodesSideOne, nodesSideTwo, getPartitionDescription());
-        activeDisruption = true;
-        for (String node1 : nodesSideOne) {
-            MockTransportService transportService1 = (MockTransportService) cluster.getInstance(TransportService.class, node1);
-            for (String node2 : nodesSideTwo) {
-                MockTransportService transportService2 = (MockTransportService) cluster.getInstance(TransportService.class, node2);
-                applyDisruption(transportService1, transportService2);
-            }
-        }
-    }
-
-
-    @Override
-    public synchronized void stopDisrupting() {
-        if (nodesSideOne.size() == 0 || nodesSideTwo.size() == 0 || !activeDisruption) {
-            return;
-        }
-        logger.info("restoring partition between nodes {} & nodes {}", nodesSideOne, nodesSideTwo);
-        for (String node1 : nodesSideOne) {
-            MockTransportService transportService1 = (MockTransportService) cluster.getInstance(TransportService.class, node1);
-            for (String node2 : nodesSideTwo) {
-                MockTransportService transportService2 = (MockTransportService) cluster.getInstance(TransportService.class, node2);
-                removeDisruption(transportService1, transportService2);
-            }
-        }
-        activeDisruption = false;
-    }
-
-    abstract void applyDisruption(MockTransportService transportService1, MockTransportService transportService2);
-
-
-    protected void removeDisruption(MockTransportService transportService1, MockTransportService transportService2) {
-        transportService1.clearRule(transportService2);
-        transportService2.clearRule(transportService1);
-    }
-
-}

+ 0 - 56
test/framework/src/main/java/org/elasticsearch/test/disruption/NetworkUnresponsivePartition.java

@@ -1,56 +0,0 @@
-/*
- * Licensed to Elasticsearch under one or more contributor
- * license agreements. See the NOTICE file distributed with
- * this work for additional information regarding copyright
- * ownership. Elasticsearch licenses this file to you under
- * the Apache License, Version 2.0 (the "License"); you may
- * not use this file except in compliance with the License.
- * You may obtain a copy of the License at
- *
- *    http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing,
- * software distributed under the License is distributed on an
- * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
- * KIND, either express or implied.  See the License for the
- * specific language governing permissions and limitations
- * under the License.
- */
-package org.elasticsearch.test.disruption;
-
-import org.elasticsearch.common.unit.TimeValue;
-import org.elasticsearch.test.transport.MockTransportService;
-
-import java.util.Random;
-import java.util.Set;
-
-public class NetworkUnresponsivePartition extends NetworkPartition {
-
-    public NetworkUnresponsivePartition(Random random) {
-        super(random);
-    }
-
-    public NetworkUnresponsivePartition(String node1, String node2, Random random) {
-        super(node1, node2, random);
-    }
-
-    public NetworkUnresponsivePartition(Set<String> nodesSideOne, Set<String> nodesSideTwo, Random random) {
-        super(nodesSideOne, nodesSideTwo, random);
-    }
-
-    @Override
-    protected String getPartitionDescription() {
-        return "unresponsive";
-    }
-
-    @Override
-    void applyDisruption(MockTransportService transportService1, MockTransportService transportService2) {
-        transportService1.addUnresponsiveRule(transportService2);
-        transportService2.addUnresponsiveRule(transportService1);
-    }
-
-    @Override
-    public TimeValue expectedTimeToHeal() {
-        return TimeValue.timeValueSeconds(0);
-    }
-}