Browse Source

Remove ban parent-task BWC (#66525)

This BWC is no longer needed after backporting #66066 to 7.x.

Closes #66518
Nhat Nguyen 4 years ago
parent
commit
67bf9fd42b

+ 12 - 65
server/src/main/java/org/elasticsearch/tasks/TaskManager.java

@@ -29,7 +29,6 @@ import org.elasticsearch.Assertions;
 import org.elasticsearch.ElasticsearchException;
 import org.elasticsearch.ElasticsearchTimeoutException;
 import org.elasticsearch.ExceptionsHelper;
-import org.elasticsearch.Version;
 import org.elasticsearch.action.ActionListener;
 import org.elasticsearch.action.ActionRequest;
 import org.elasticsearch.action.ActionResponse;
@@ -61,7 +60,6 @@ import java.util.Collection;
 import java.util.Collections;
 import java.util.HashMap;
 import java.util.HashSet;
-import java.util.Iterator;
 import java.util.List;
 import java.util.Map;
 import java.util.Objects;
@@ -383,15 +381,6 @@ public class TaskManager implements ClusterStateApplier {
         }
     }
 
-    /**
-     * Returns the number of currently banned tasks.
-     * <p>
-     * Will be used in task manager stats and for debugging.
-     */
-    public int getBanCount() {
-        return bannedParents.size();
-    }
-
     /**
      * Bans all tasks with the specified parent task from execution, cancels all tasks that are currently executing.
      * <p>
@@ -401,24 +390,15 @@ public class TaskManager implements ClusterStateApplier {
     public List<CancellableTask> setBan(TaskId parentTaskId, String reason, TransportChannel channel) {
         logger.trace("setting ban for the parent task {} {}", parentTaskId, reason);
         synchronized (bannedParents) {
-            if (channel.getVersion().onOrAfter(Version.V_8_0_0)) {
-                final Ban ban = bannedParents.computeIfAbsent(parentTaskId, k -> new Ban(reason, true));
-                assert ban.perChannel : "not a ban per channel";
-                while (channel instanceof TaskTransportChannel) {
-                    channel = ((TaskTransportChannel) channel).getChannel();
-                }
-                if (channel instanceof TcpTransportChannel) {
-                    startTrackingChannel(((TcpTransportChannel) channel).getChannel(), ban::registerChannel);
-                } else {
-                    assert channel.getChannelType().equals("direct") : "expect direct channel; got [" + channel + "]";
-                    ban.registerChannel(DIRECT_CHANNEL_TRACKER);
-                }
+            final Ban ban = bannedParents.computeIfAbsent(parentTaskId, k -> new Ban(reason));
+            while (channel instanceof TaskTransportChannel) {
+                channel = ((TaskTransportChannel) channel).getChannel();
+            }
+            if (channel instanceof TcpTransportChannel) {
+                startTrackingChannel(((TcpTransportChannel) channel).getChannel(), ban::registerChannel);
             } else {
-                if (lastDiscoveryNodes.nodeExists(parentTaskId.getNodeId())) {
-                    // Only set the ban if the node is the part of the cluster
-                    final Ban existing = bannedParents.put(parentTaskId, new Ban(reason, false));
-                    assert existing == null || existing.perChannel == false : "not a ban per node";
-                }
+                assert channel.getChannelType().equals("direct") : "expect direct channel; got [" + channel + "]";
+                ban.registerChannel(DIRECT_CHANNEL_TRACKER);
             }
         }
         return cancellableTasks.values().stream()
@@ -444,41 +424,32 @@ public class TaskManager implements ClusterStateApplier {
 
     private class Ban {
         final String reason;
-        final boolean perChannel; // TODO: Remove this in 8.0
         final Set<ChannelPendingTaskTracker> channels;
 
-        Ban(String reason, boolean perChannel) {
+        Ban(String reason) {
             assert Thread.holdsLock(bannedParents);
             this.reason = reason;
-            this.perChannel = perChannel;
-            if (perChannel) {
-                this.channels = new HashSet<>();
-            } else {
-                this.channels = Set.of();
-            }
+            this.channels = new HashSet<>();
         }
 
         void registerChannel(ChannelPendingTaskTracker channel) {
             assert Thread.holdsLock(bannedParents);
-            assert perChannel : "not a ban per channel";
             channels.add(channel);
         }
 
         boolean unregisterChannel(ChannelPendingTaskTracker channel) {
             assert Thread.holdsLock(bannedParents);
-            assert perChannel : "not a ban per channel";
             return channels.remove(channel);
         }
 
         int registeredChannels() {
             assert Thread.holdsLock(bannedParents);
-            assert perChannel : "not a ban per channel";
             return channels.size();
         }
 
         @Override
         public String toString() {
-            return "Ban{" + "reason='" + reason + '\'' + ", perChannel=" + perChannel + ", channels=" + channels + '}';
+            return "Ban{" + "reason=" + reason + ", channels=" + channels + '}';
         }
     }
 
@@ -502,21 +473,6 @@ public class TaskManager implements ClusterStateApplier {
     @Override
     public void applyClusterState(ClusterChangedEvent event) {
         lastDiscoveryNodes = event.state().getNodes();
-        if (event.nodesRemoved()) {
-            synchronized (bannedParents) {
-                lastDiscoveryNodes = event.state().getNodes();
-                // Remove all bans that were registered by nodes that are no longer in the cluster state
-                final Iterator<Map.Entry<TaskId, Ban>> banIterator = bannedParents.entrySet().iterator();
-                while (banIterator.hasNext()) {
-                    final Map.Entry<TaskId, Ban> ban = banIterator.next();
-                    if (ban.getValue().perChannel == false && lastDiscoveryNodes.nodeExists(ban.getKey().getNodeId()) == false) {
-                        logger.debug("Removing ban for the parent [{}] on the node [{}], reason: the parent node is gone",
-                            ban.getKey(), event.state().getNodes().getLocalNode());
-                        banIterator.remove();
-                    }
-                }
-            }
-        }
     }
 
     /**
@@ -764,16 +720,7 @@ public class TaskManager implements ClusterStateApplier {
 
         // Unregister the closing channel and remove bans whose has no registered channels
         synchronized (bannedParents) {
-            final Iterator<Map.Entry<TaskId, Ban>> iterator = bannedParents.entrySet().iterator();
-            while (iterator.hasNext()) {
-                final Map.Entry<TaskId, Ban> entry = iterator.next();
-                final Ban ban = entry.getValue();
-                if (ban.perChannel) {
-                    if (ban.unregisterChannel(channel) && entry.getValue().registeredChannels() == 0) {
-                        iterator.remove();
-                    }
-                }
-            }
+            bannedParents.values().removeIf(ban -> ban.unregisterChannel(channel) && ban.registeredChannels() == 0);
         }
     }
 

+ 17 - 23
server/src/test/java/org/elasticsearch/action/admin/cluster/node/tasks/CancellableTasksTests.java

@@ -29,8 +29,6 @@ import org.elasticsearch.action.admin.cluster.node.tasks.list.ListTasksRequest;
 import org.elasticsearch.action.admin.cluster.node.tasks.list.ListTasksResponse;
 import org.elasticsearch.action.support.ActionTestUtils;
 import org.elasticsearch.action.support.nodes.BaseNodesRequest;
-import org.elasticsearch.action.support.replication.ClusterStateCreationUtils;
-import org.elasticsearch.cluster.node.DiscoveryNode;
 import org.elasticsearch.cluster.service.ClusterService;
 import org.elasticsearch.common.io.stream.StreamInput;
 import org.elasticsearch.common.io.stream.StreamOutput;
@@ -61,7 +59,7 @@ import java.util.concurrent.atomic.AtomicBoolean;
 import java.util.concurrent.atomic.AtomicIntegerArray;
 import java.util.concurrent.atomic.AtomicReference;
 
-import static org.elasticsearch.test.ClusterServiceUtils.setState;
+import static org.hamcrest.Matchers.empty;
 import static org.hamcrest.Matchers.equalTo;
 import static org.hamcrest.Matchers.greaterThanOrEqualTo;
 import static org.hamcrest.Matchers.lessThanOrEqualTo;
@@ -298,7 +296,7 @@ public class CancellableTasksTests extends TaskManagerTestCase {
         // while the ban is still there, but it should disappear shortly
         assertBusy(() -> {
             for (int i = 0; i < testNodes.length; i++) {
-                assertEquals("No bans on the node " + i, 0, testNodes[i].transportService.getTaskManager().getBanCount());
+                assertThat("No bans on the node " + i, testNodes[i].transportService.getTaskManager().getBannedTaskIds(), empty());
             }
         });
     }
@@ -376,8 +374,7 @@ public class CancellableTasksTests extends TaskManagerTestCase {
         assertTrue("onChildTasksCompleted() is not invoked", latch.await(1, TimeUnit.SECONDS));
     }
 
-    @AwaitsFix(bugUrl = "https://github.com/elastic/elasticsearch/issues/66518")
-    public void testTaskCancellationOnCoordinatingNodeLeavingTheCluster() throws Exception {
+    public void testTaskCancelledWhenConnectionClose() throws Exception {
         setupTestNodes(Settings.EMPTY);
         connectNodes(testNodes);
         CountDownLatch responseLatch = new CountDownLatch(1);
@@ -412,17 +409,10 @@ public class CancellableTasksTests extends TaskManagerTestCase {
             new ListTasksRequest().setParentTaskId(new TaskId(mainNode, mainTask.getId())));
         assertThat(listTasksResponse.getTasks().size(), greaterThanOrEqualTo(blockOnNodes.size()));
 
-        // Simulate the coordinating node leaving the cluster
+        // Simulate connections close
         if (randomBoolean()) {
-            DiscoveryNode[] discoveryNodes = new DiscoveryNode[testNodes.length - 1];
             for (int i = 1; i < testNodes.length; i++) {
-                discoveryNodes[i - 1] = testNodes[i].discoveryNode();
-            }
-            DiscoveryNode master = discoveryNodes[0];
-            for (int i = 1; i < testNodes.length; i++) {
-                // Notify only nodes that should remain in the cluster
-                setState(testNodes[i].clusterService,
-                    ClusterStateCreationUtils.state(testNodes[i].discoveryNode(), master, discoveryNodes));
+                testNodes[i].transportService.disconnectFromNode(testNodes[0].discoveryNode());
             }
             if (randomBoolean()) {
                 logger.info("--> Simulate issuing cancel request on the node that is about to leave the cluster");
@@ -434,20 +424,14 @@ public class CancellableTasksTests extends TaskManagerTestCase {
                 CancelTasksResponse response = ActionTestUtils.executeBlocking(testNodes[0].transportCancelTasksAction, request);
                 logger.info("--> Done simulating issuing cancel request on the node that is about to leave the cluster");
                 // This node still thinks that's part of the cluster, so cancelling should look successful
-                if (response.getTasks().size() == 0) {
-                    logger.error("!!!!");
-                }
                 assertThat(response.getTasks().size(), lessThanOrEqualTo(1));
                 assertThat(response.getTaskFailures().size(), lessThanOrEqualTo(1));
                 assertThat(response.getTaskFailures().size() + response.getTasks().size(), lessThanOrEqualTo(1));
             }
         }
 
-        for (int i = 1; i < testNodes.length; i++) {
-            assertEquals("No bans on the node " + i, 0, testNodes[i].transportService.getTaskManager().getBanCount());
-        }
-
-        if (randomBoolean()) {
+        boolean mainNodeClosed = randomBoolean();
+        if (mainNodeClosed) {
             testNodes[0].close();
         } else {
             for (TestNode blockOnNode : blockOnNodes) {
@@ -469,6 +453,16 @@ public class CancellableTasksTests extends TaskManagerTestCase {
 
         // Wait for clean up
         responseLatch.await();
+        assertBusy(() -> {
+            // If the main node is closed, then we won't able to send unban requests to remove bans, but a direct channel never closes.
+            // Hence, we can't verify if all bans are removed.
+            if (mainNodeClosed == false) {
+                assertThat("No bans on the node " + 0, testNodes[0].transportService.getTaskManager().getBannedTaskIds(), empty());
+            }
+            for (int i = 1; i < testNodes.length; i++) {
+                assertThat("No bans on the node " + i, testNodes[i].transportService.getTaskManager().getBannedTaskIds(), empty());
+            }
+        });
     }
 
     public void testNonExistingTaskCancellation() throws Exception {