Browse Source

Drop node if asymmetrically partitioned from master (#39598)

When a node is joining the cluster we ensure that it can send requests to the
master _at that time_. If it joins the cluster and _then_ loses the ability to
send requests to the master then it should be removed from the cluster. Today
this is not the case: the master can still receive responses to its follower
checks, and receives acknowledgements to cluster state publications, so has no
reason to remove the node.

This commit changes the handling of follower checks so that they fail if they
come from a master that the other node was following but which it now believes
to have failed.
David Turner 6 years ago
parent
commit
2d28d7bb4d

+ 9 - 1
server/src/main/java/org/elasticsearch/cluster/coordination/Coordinator.java

@@ -221,6 +221,14 @@ public class Coordinator extends AbstractLifecycleComponent implements Discovery
             // where we would possibly have to remove the NO_MASTER_BLOCK from the applierState when turning a candidate back to follower.
             if (getLastAcceptedState().term() < getCurrentTerm()) {
                 becomeFollower("onFollowerCheckRequest", followerCheckRequest.getSender());
+            } else if (mode == Mode.FOLLOWER) {
+                logger.trace("onFollowerCheckRequest: responding successfully to {}", followerCheckRequest);
+            } else if (joinHelper.isJoinPending()) {
+                logger.trace("onFollowerCheckRequest: rejoining master, responding successfully to {}", followerCheckRequest);
+            } else {
+                logger.trace("onFollowerCheckRequest: received check from faulty master, rejecting {}", followerCheckRequest);
+                throw new CoordinationStateRejectedException(
+                    "onFollowerCheckRequest: received check from faulty master, rejecting " + followerCheckRequest);
             }
         }
     }
@@ -436,7 +444,7 @@ public class Coordinator extends AbstractLifecycleComponent implements Discovery
 
     // package private for tests
     void sendValidateJoinRequest(ClusterState stateForJoinValidation, JoinRequest joinRequest,
-                                        JoinHelper.JoinCallback joinCallback) {
+                                 JoinHelper.JoinCallback joinCallback) {
         // validate the join on the joining node, will throw a failure if it fails the validation
         joinHelper.sendValidateJoinRequest(joinRequest.getSourceNode(), stateForJoinValidation, new ActionListener<Empty>() {
             @Override

+ 8 - 10
server/src/main/java/org/elasticsearch/cluster/coordination/JoinHelper.java

@@ -82,10 +82,10 @@ public class JoinHelper {
 
     final Set<Tuple<DiscoveryNode, JoinRequest>> pendingOutgoingJoins = ConcurrentCollections.newConcurrentSet();
 
-    public JoinHelper(Settings settings, AllocationService allocationService, MasterService masterService,
-                      TransportService transportService, LongSupplier currentTermSupplier, Supplier<ClusterState> currentStateSupplier,
-                      BiConsumer<JoinRequest, JoinCallback> joinHandler, Function<StartJoinRequest, Join> joinLeaderInTerm,
-                      Collection<BiConsumer<DiscoveryNode, ClusterState>> joinValidators) {
+    JoinHelper(Settings settings, AllocationService allocationService, MasterService masterService,
+               TransportService transportService, LongSupplier currentTermSupplier, Supplier<ClusterState> currentStateSupplier,
+               BiConsumer<JoinRequest, JoinCallback> joinHandler, Function<StartJoinRequest, Join> joinLeaderInTerm,
+               Collection<BiConsumer<DiscoveryNode, ClusterState>> joinValidators) {
         this.masterService = masterService;
         this.transportService = transportService;
         this.joinTimeout = JOIN_TIMEOUT_SETTING.get(settings);
@@ -167,12 +167,12 @@ public class JoinHelper {
         };
     }
 
-    public void sendJoinRequest(DiscoveryNode destination, Optional<Join> optionalJoin) {
-        sendJoinRequest(destination, optionalJoin, () -> {
-        });
+    boolean isJoinPending() {
+        // cannot use pendingOutgoingJoins.isEmpty() because it's not properly synchronized.
+        return pendingOutgoingJoins.iterator().hasNext();
     }
 
-    public void sendJoinRequest(DiscoveryNode destination, Optional<Join> optionalJoin, Runnable onCompletion) {
+    void sendJoinRequest(DiscoveryNode destination, Optional<Join> optionalJoin) {
         assert destination.isMasterNode() : "trying to join master-ineligible " + destination;
         final JoinRequest joinRequest = new JoinRequest(transportService.getLocalNode(), optionalJoin);
         final Tuple<DiscoveryNode, JoinRequest> dedupKey = Tuple.tuple(destination, joinRequest);
@@ -190,14 +190,12 @@ public class JoinHelper {
                     public void handleResponse(Empty response) {
                         pendingOutgoingJoins.remove(dedupKey);
                         logger.debug("successfully joined {} with {}", destination, joinRequest);
-                        onCompletion.run();
                     }
 
                     @Override
                     public void handleException(TransportException exp) {
                         pendingOutgoingJoins.remove(dedupKey);
                         logger.info(() -> new ParameterizedMessage("failed to join {} with {}", destination, joinRequest), exp);
-                        onCompletion.run();
                     }
 
                     @Override

+ 46 - 2
server/src/test/java/org/elasticsearch/cluster/coordination/CoordinatorTests.java

@@ -918,8 +918,9 @@ public class CoordinatorTests extends ESTestCase {
                 nonLeader.coordinator.becomeCandidate("forced");
             }
             logger.debug("simulate follower check coming through from {} to {}", leader.getId(), nonLeader.getId());
-            nonLeader.coordinator.onFollowerCheckRequest(new FollowersChecker.FollowerCheckRequest(leader.coordinator.getCurrentTerm(),
-                leader.getLocalNode()));
+            expectThrows(CoordinationStateRejectedException.class, () -> nonLeader.coordinator.onFollowerCheckRequest(
+                new FollowersChecker.FollowerCheckRequest(leader.coordinator.getCurrentTerm(), leader.getLocalNode())));
+            assertThat(nonLeader.coordinator.getMode(), equalTo(CANDIDATE));
         }).run();
         cluster.stabilise();
     }
@@ -1080,6 +1081,38 @@ public class CoordinatorTests extends ESTestCase {
         cluster.stabilise();
     }
 
+    public void testFollowerRemovedIfUnableToSendRequestsToMaster() {
+        final Cluster cluster = new Cluster(3);
+        cluster.runRandomly();
+        cluster.stabilise();
+
+        final ClusterNode leader = cluster.getAnyLeader();
+        final ClusterNode otherNode = cluster.getAnyNodeExcept(leader);
+
+        cluster.blackholeConnectionsFrom(otherNode, leader);
+
+        cluster.runFor(
+            (defaultMillis(FOLLOWER_CHECK_INTERVAL_SETTING) + defaultMillis(FOLLOWER_CHECK_TIMEOUT_SETTING))
+                * defaultInt(FOLLOWER_CHECK_RETRY_COUNT_SETTING)
+                + (defaultMillis(LEADER_CHECK_INTERVAL_SETTING) + DEFAULT_DELAY_VARIABILITY)
+                * defaultInt(LEADER_CHECK_RETRY_COUNT_SETTING)
+                + DEFAULT_CLUSTER_STATE_UPDATE_DELAY,
+            "awaiting removal of asymmetrically-partitioned node");
+
+        assertThat(leader.getLastAppliedClusterState().nodes().toString(),
+            leader.getLastAppliedClusterState().nodes().getSize(), equalTo(2));
+
+        cluster.clearBlackholedConnections();
+
+        cluster.stabilise(
+            // time for the disconnected node to find the master again
+            defaultMillis(DISCOVERY_FIND_PEERS_INTERVAL_SETTING) * 2
+                // time for joining
+                + 4 * DEFAULT_DELAY_VARIABILITY
+                // Then a commit of the updated cluster state
+                + DEFAULT_CLUSTER_STATE_UPDATE_DELAY);
+    }
+
     private static long defaultMillis(Setting<TimeValue> setting) {
         return setting.get(Settings.EMPTY).millis() + Cluster.DEFAULT_DELAY_VARIABILITY;
     }
@@ -1142,6 +1175,7 @@ public class CoordinatorTests extends ESTestCase {
 
         private final Set<String> disconnectedNodes = new HashSet<>();
         private final Set<String> blackholedNodes = new HashSet<>();
+        private final Set<Tuple<String,String>> blackholedConnections = new HashSet<>();
         private final Map<Long, ClusterState> committedStatesByVersion = new HashMap<>();
         private final LinearizabilityChecker linearizabilityChecker = new LinearizabilityChecker();
         private final History history = new History();
@@ -1509,6 +1543,8 @@ public class CoordinatorTests extends ESTestCase {
                 connectionStatus = ConnectionStatus.BLACK_HOLE;
             } else if (disconnectedNodes.contains(sender.getId()) || disconnectedNodes.contains(destination.getId())) {
                 connectionStatus = ConnectionStatus.DISCONNECTED;
+            } else if (blackholedConnections.contains(Tuple.tuple(sender.getId(), destination.getId()))) {
+                connectionStatus = ConnectionStatus.BLACK_HOLE_REQUESTS_ONLY;
             } else if (nodeExists(sender) && nodeExists(destination)) {
                 connectionStatus = ConnectionStatus.CONNECTED;
             } else {
@@ -1559,6 +1595,14 @@ public class CoordinatorTests extends ESTestCase {
             seedHostsList = emptyList();
         }
 
+        void blackholeConnectionsFrom(ClusterNode sender, ClusterNode destination) {
+            blackholedConnections.add(Tuple.tuple(sender.getId(), destination.getId()));
+        }
+
+        void clearBlackholedConnections() {
+            blackholedConnections.clear();
+        }
+
         class MockPersistedState implements PersistedState {
             private final PersistedState delegate;
             private final NodeEnvironment nodeEnvironment;

+ 11 - 0
server/src/test/java/org/elasticsearch/cluster/coordination/JoinHelperTests.java

@@ -51,6 +51,8 @@ public class JoinHelperTests extends ESTestCase {
         DiscoveryNode node1 = new DiscoveryNode("node1", buildNewFakeTransportAddress(), Version.CURRENT);
         DiscoveryNode node2 = new DiscoveryNode("node2", buildNewFakeTransportAddress(), Version.CURRENT);
 
+        assertFalse(joinHelper.isJoinPending());
+
         // check that sending a join to node1 works
         Optional<Join> optionalJoin1 = randomBoolean() ? Optional.empty() :
             Optional.of(new Join(localNode, node1, randomNonNegativeLong(), randomNonNegativeLong(), randomNonNegativeLong()));
@@ -60,6 +62,8 @@ public class JoinHelperTests extends ESTestCase {
         CapturedRequest capturedRequest1 = capturedRequests1[0];
         assertEquals(node1, capturedRequest1.node);
 
+        assertTrue(joinHelper.isJoinPending());
+
         // check that sending a join to node2 works
         Optional<Join> optionalJoin2 = randomBoolean() ? Optional.empty() :
             Optional.of(new Join(localNode, node2, randomNonNegativeLong(), randomNonNegativeLong(), randomNonNegativeLong()));
@@ -95,5 +99,12 @@ public class JoinHelperTests extends ESTestCase {
         assertThat(capturedRequests2a.length, equalTo(1));
         CapturedRequest capturedRequest2a = capturedRequests2a[0];
         assertEquals(node2, capturedRequest2a.node);
+
+        // complete all the joins and check that isJoinPending is updated
+        assertTrue(joinHelper.isJoinPending());
+        capturingTransport.handleRemoteError(capturedRequest2.requestId, new CoordinationStateRejectedException("dummy"));
+        capturingTransport.handleRemoteError(capturedRequest1a.requestId, new CoordinationStateRejectedException("dummy"));
+        capturingTransport.handleRemoteError(capturedRequest2a.requestId, new CoordinationStateRejectedException("dummy"));
+        assertFalse(joinHelper.isJoinPending());
     }
 }

+ 56 - 13
test/framework/src/main/java/org/elasticsearch/test/disruption/DisruptableMockTransport.java

@@ -116,8 +116,10 @@ public abstract class DisruptableMockTransport extends MockTransport {
         destinationTransport.execute(action, new Runnable() {
             @Override
             public void run() {
-                switch (getConnectionStatus(destinationTransport.getLocalNode())) {
+                final ConnectionStatus connectionStatus = getConnectionStatus(destinationTransport.getLocalNode());
+                switch (connectionStatus) {
                     case BLACK_HOLE:
+                    case BLACK_HOLE_REQUESTS_ONLY:
                         onBlackholedDuringSend(requestId, action, destinationTransport);
                         break;
 
@@ -128,6 +130,9 @@ public abstract class DisruptableMockTransport extends MockTransport {
                     case CONNECTED:
                         onConnectedDuringSend(requestId, action, request, destinationTransport);
                         break;
+
+                    default:
+                        throw new AssertionError("unexpected status: " + connectionStatus);
                 }
             }
 
@@ -197,11 +202,20 @@ public abstract class DisruptableMockTransport extends MockTransport {
                 execute(action, new Runnable() {
                     @Override
                     public void run() {
-                        if (destinationTransport.getConnectionStatus(getLocalNode()) != ConnectionStatus.CONNECTED) {
-                            logger.trace("dropping response to {}: channel is not CONNECTED",
-                                requestDescription);
-                        } else {
-                            handleResponse(requestId, response);
+                        final ConnectionStatus connectionStatus = destinationTransport.getConnectionStatus(getLocalNode());
+                        switch (connectionStatus) {
+                            case CONNECTED:
+                            case BLACK_HOLE_REQUESTS_ONLY:
+                                handleResponse(requestId, response);
+                                break;
+
+                            case BLACK_HOLE:
+                            case DISCONNECTED:
+                                logger.trace("dropping response to {}: channel is {}", requestDescription, connectionStatus);
+                                break;
+
+                            default:
+                                throw new AssertionError("unexpected status: " + connectionStatus);
                         }
                     }
 
@@ -217,11 +231,20 @@ public abstract class DisruptableMockTransport extends MockTransport {
                 execute(action, new Runnable() {
                     @Override
                     public void run() {
-                        if (destinationTransport.getConnectionStatus(getLocalNode()) != ConnectionStatus.CONNECTED) {
-                            logger.trace("dropping response to {}: channel is not CONNECTED",
-                                requestDescription);
-                        } else {
-                            handleRemoteError(requestId, exception);
+                        final ConnectionStatus connectionStatus = destinationTransport.getConnectionStatus(getLocalNode());
+                        switch (connectionStatus) {
+                            case CONNECTED:
+                            case BLACK_HOLE_REQUESTS_ONLY:
+                                handleRemoteError(requestId, exception);
+                                break;
+
+                            case BLACK_HOLE:
+                            case DISCONNECTED:
+                                logger.trace("dropping exception response to {}: channel is {}", requestDescription, connectionStatus);
+                                break;
+
+                            default:
+                                throw new AssertionError("unexpected status: " + connectionStatus);
                         }
                     }
 
@@ -251,9 +274,29 @@ public abstract class DisruptableMockTransport extends MockTransport {
         }
     }
 
+    /**
+     * Response type from {@link DisruptableMockTransport#getConnectionStatus(DiscoveryNode)} indicating whether, and how, messages should
+     * be disrupted on this transport.
+     */
     public enum ConnectionStatus {
+        /**
+         * No disruption: deliver messages normally.
+         */
         CONNECTED,
-        DISCONNECTED, // network requests to or from this node throw a ConnectTransportException
-        BLACK_HOLE // network traffic to or from the corresponding node is silently discarded
+
+        /**
+         * Simulate disconnection: inbound and outbound messages throw a {@link ConnectTransportException}.
+         */
+        DISCONNECTED,
+
+        /**
+         * Simulate a blackhole partition: inbound and outbound messages are silently discarded.
+         */
+        BLACK_HOLE,
+
+        /**
+         * Simulate an asymmetric partition: outbound messages are silently discarded, but inbound messages are delivered normally.
+         */
+        BLACK_HOLE_REQUESTS_ONLY
     }
 }

+ 64 - 22
test/framework/src/test/java/org/elasticsearch/test/disruption/DisruptableMockTransportTests.java

@@ -56,29 +56,32 @@ import static org.hamcrest.Matchers.containsString;
 
 public class DisruptableMockTransportTests extends ESTestCase {
 
-    DiscoveryNode node1;
-    DiscoveryNode node2;
+    private DiscoveryNode node1;
+    private DiscoveryNode node2;
 
-    DisruptableMockTransport transport1;
-    DisruptableMockTransport transport2;
+    private TransportService service1;
+    private TransportService service2;
 
-    TransportService service1;
-    TransportService service2;
+    private DeterministicTaskQueue deterministicTaskQueue;
 
-    DeterministicTaskQueue deterministicTaskQueue;
+    private Set<Tuple<DiscoveryNode, DiscoveryNode>> disconnectedLinks;
+    private Set<Tuple<DiscoveryNode, DiscoveryNode>> blackholedLinks;
+    private Set<Tuple<DiscoveryNode, DiscoveryNode>> blackholedRequestLinks;
 
-    Set<Tuple<DiscoveryNode, DiscoveryNode>> disconnectedLinks;
-    Set<Tuple<DiscoveryNode, DiscoveryNode>> blackholedLinks;
-
-    ConnectionStatus getConnectionStatus(DiscoveryNode sender, DiscoveryNode destination) {
+    private ConnectionStatus getConnectionStatus(DiscoveryNode sender, DiscoveryNode destination) {
         Tuple<DiscoveryNode, DiscoveryNode> link = Tuple.tuple(sender, destination);
         if (disconnectedLinks.contains(link)) {
             assert blackholedLinks.contains(link) == false;
+            assert blackholedRequestLinks.contains(link) == false;
             return ConnectionStatus.DISCONNECTED;
         }
         if (blackholedLinks.contains(link)) {
+            assert blackholedRequestLinks.contains(link) == false;
             return ConnectionStatus.BLACK_HOLE;
         }
+        if (blackholedRequestLinks.contains(link)) {
+            return ConnectionStatus.BLACK_HOLE_REQUESTS_ONLY;
+        }
         return ConnectionStatus.CONNECTED;
     }
 
@@ -89,13 +92,14 @@ public class DisruptableMockTransportTests extends ESTestCase {
 
         disconnectedLinks = new HashSet<>();
         blackholedLinks = new HashSet<>();
+        blackholedRequestLinks = new HashSet<>();
 
         List<DisruptableMockTransport> transports = new ArrayList<>();
 
         deterministicTaskQueue = new DeterministicTaskQueue(
             Settings.builder().put(Node.NODE_NAME_SETTING.getKey(), "dummy").build(), random());
 
-        transport1 = new DisruptableMockTransport(node1, logger) {
+        final DisruptableMockTransport transport1 = new DisruptableMockTransport(node1, logger) {
             @Override
             protected ConnectionStatus getConnectionStatus(DiscoveryNode destination) {
                 return DisruptableMockTransportTests.this.getConnectionStatus(getLocalNode(), destination);
@@ -112,7 +116,7 @@ public class DisruptableMockTransportTests extends ESTestCase {
             }
         };
 
-        transport2 = new DisruptableMockTransport(node2, logger) {
+        final DisruptableMockTransport transport2 = new DisruptableMockTransport(node2, logger) {
             @Override
             protected ConnectionStatus getConnectionStatus(DiscoveryNode destination) {
                 return DisruptableMockTransportTests.this.getConnectionStatus(getLocalNode(), destination);
@@ -144,7 +148,6 @@ public class DisruptableMockTransportTests extends ESTestCase {
         service2.connectToNode(node1);
     }
 
-
     private TransportRequestHandler<TransportRequest.Empty> requestHandlerShouldNotBeCalled() {
         return (request, channel, task) -> {
             throw new AssertionError("should not be called");
@@ -293,15 +296,21 @@ public class DisruptableMockTransportTests extends ESTestCase {
         deterministicTaskQueue.runAllRunnableTasks();
     }
 
+    public void testUnavailableOnRequestOnly() {
+        registerRequestHandler(service1, requestHandlerShouldNotBeCalled());
+        registerRequestHandler(service2, requestHandlerShouldNotBeCalled());
+        blackholedRequestLinks.add(Tuple.tuple(node1, node2));
+        send(service1, node2, responseHandlerShouldNotBeCalled());
+        deterministicTaskQueue.runAllRunnableTasks();
+    }
+
     public void testDisconnectedOnSuccessfulResponse() throws IOException {
         registerRequestHandler(service1, requestHandlerShouldNotBeCalled());
         AtomicReference<TransportChannel> responseHandlerChannel = new AtomicReference<>();
         registerRequestHandler(service2, requestHandlerCaptures(responseHandlerChannel::set));
 
-        AtomicReference<TransportException> responseHandlerException = new AtomicReference<>();
         send(service1, node2, responseHandlerShouldNotBeCalled());
         deterministicTaskQueue.runAllRunnableTasks();
-        assertNull(responseHandlerException.get());
         assertNotNull(responseHandlerChannel.get());
 
         disconnectedLinks.add(Tuple.tuple(node2, node1));
@@ -314,10 +323,8 @@ public class DisruptableMockTransportTests extends ESTestCase {
         AtomicReference<TransportChannel> responseHandlerChannel = new AtomicReference<>();
         registerRequestHandler(service2, requestHandlerCaptures(responseHandlerChannel::set));
 
-        AtomicReference<TransportException> responseHandlerException = new AtomicReference<>();
         send(service1, node2, responseHandlerShouldNotBeCalled());
         deterministicTaskQueue.runAllRunnableTasks();
-        assertNull(responseHandlerException.get());
         assertNotNull(responseHandlerChannel.get());
 
         disconnectedLinks.add(Tuple.tuple(node2, node1));
@@ -330,10 +337,8 @@ public class DisruptableMockTransportTests extends ESTestCase {
         AtomicReference<TransportChannel> responseHandlerChannel = new AtomicReference<>();
         registerRequestHandler(service2, requestHandlerCaptures(responseHandlerChannel::set));
 
-        AtomicReference<TransportException> responseHandlerException = new AtomicReference<>();
         send(service1, node2, responseHandlerShouldNotBeCalled());
         deterministicTaskQueue.runAllRunnableTasks();
-        assertNull(responseHandlerException.get());
         assertNotNull(responseHandlerChannel.get());
 
         blackholedLinks.add(Tuple.tuple(node2, node1));
@@ -346,10 +351,8 @@ public class DisruptableMockTransportTests extends ESTestCase {
         AtomicReference<TransportChannel> responseHandlerChannel = new AtomicReference<>();
         registerRequestHandler(service2, requestHandlerCaptures(responseHandlerChannel::set));
 
-        AtomicReference<TransportException> responseHandlerException = new AtomicReference<>();
         send(service1, node2, responseHandlerShouldNotBeCalled());
         deterministicTaskQueue.runAllRunnableTasks();
-        assertNull(responseHandlerException.get());
         assertNotNull(responseHandlerChannel.get());
 
         blackholedLinks.add(Tuple.tuple(node2, node1));
@@ -357,4 +360,43 @@ public class DisruptableMockTransportTests extends ESTestCase {
         deterministicTaskQueue.runAllRunnableTasks();
     }
 
+    public void testUnavailableOnRequestOnlyReceivesSuccessfulResponse() throws IOException {
+        registerRequestHandler(service1, requestHandlerShouldNotBeCalled());
+        AtomicReference<TransportChannel> responseHandlerChannel = new AtomicReference<>();
+        registerRequestHandler(service2, requestHandlerCaptures(responseHandlerChannel::set));
+
+        AtomicBoolean responseHandlerCalled = new AtomicBoolean();
+        send(service1, node2, responseHandlerShouldBeCalledNormally(() -> responseHandlerCalled.set(true)));
+
+        deterministicTaskQueue.runAllTasks();
+        assertNotNull(responseHandlerChannel.get());
+        assertFalse(responseHandlerCalled.get());
+
+        blackholedRequestLinks.add(Tuple.tuple(node1, node2));
+        blackholedRequestLinks.add(Tuple.tuple(node2, node1));
+        responseHandlerChannel.get().sendResponse(TransportResponse.Empty.INSTANCE);
+
+        deterministicTaskQueue.runAllRunnableTasks();
+        assertTrue(responseHandlerCalled.get());
+    }
+
+    public void testUnavailableOnRequestOnlyReceivesExceptionalResponse() throws IOException {
+        registerRequestHandler(service1, requestHandlerShouldNotBeCalled());
+        AtomicReference<TransportChannel> responseHandlerChannel = new AtomicReference<>();
+        registerRequestHandler(service2, requestHandlerCaptures(responseHandlerChannel::set));
+
+        AtomicBoolean responseHandlerCalled = new AtomicBoolean();
+        send(service1, node2, responseHandlerShouldBeCalledExceptionally(e -> responseHandlerCalled.set(true)));
+
+        deterministicTaskQueue.runAllTasks();
+        assertNotNull(responseHandlerChannel.get());
+        assertFalse(responseHandlerCalled.get());
+
+        blackholedRequestLinks.add(Tuple.tuple(node1, node2));
+        blackholedRequestLinks.add(Tuple.tuple(node2, node1));
+        responseHandlerChannel.get().sendResponse(new Exception());
+
+        deterministicTaskQueue.runAllRunnableTasks();
+        assertTrue(responseHandlerCalled.get());
+    }
 }