浏览代码

Add connection accounting tests (#85966)

Nikola Grcevski 3 年之前
父节点
当前提交
a3cbf12cb5

+ 9 - 0
server/src/main/java/org/elasticsearch/tasks/TaskManager.java

@@ -272,6 +272,15 @@ public class TaskManager implements ClusterStateApplier {
         return null;
     }
 
+    // package private for testing
+    Integer childTasksPerConnection(long taskId, Transport.Connection childConnection) {
+        final CancellableTaskHolder holder = cancellableTasks.get(taskId);
+        if (holder != null) {
+            return holder.childTasksPerConnection.get(childConnection);
+        }
+        return null;
+    }
+
     /**
      * Stores the task failure
      */

+ 84 - 0
server/src/test/java/org/elasticsearch/tasks/TaskManagerTests.java

@@ -11,6 +11,7 @@ package org.elasticsearch.tasks;
 import org.elasticsearch.Version;
 import org.elasticsearch.action.ActionListener;
 import org.elasticsearch.action.admin.cluster.node.tasks.TransportTasksActionTests;
+import org.elasticsearch.cluster.node.DiscoveryNode;
 import org.elasticsearch.common.network.CloseableChannel;
 import org.elasticsearch.common.settings.Settings;
 import org.elasticsearch.common.util.concurrent.ConcurrentCollections;
@@ -24,7 +25,10 @@ import org.elasticsearch.transport.FakeTcpChannel;
 import org.elasticsearch.transport.TcpChannel;
 import org.elasticsearch.transport.TcpTransportChannel;
 import org.elasticsearch.transport.TestTransportChannels;
+import org.elasticsearch.transport.Transport;
+import org.elasticsearch.transport.TransportException;
 import org.elasticsearch.transport.TransportRequest;
+import org.elasticsearch.transport.TransportRequestOptions;
 import org.elasticsearch.transport.TransportService;
 import org.junit.After;
 import org.junit.Before;
@@ -230,6 +234,32 @@ public class TaskManagerTests extends ESTestCase {
         assertThat(taskManager.numberOfChannelPendingTaskTrackers(), equalTo(0));
     }
 
+    public void testTaskAccounting() {
+        final TaskManager taskManager = new TaskManager(Settings.EMPTY, threadPool, Set.of());
+
+        final Task task1 = taskManager.register("transport", "test", new CancellableRequest("thread 1"));
+        final Task task2 = taskManager.register("transport", "test", new CancellableRequest("thread 2"));
+
+        final MockConnection connection1 = new MockConnection();
+        final MockConnection connection2 = new MockConnection();
+
+        Releasable releasableConnection1 = taskManager.registerChildConnection(task1.getId(), connection1);
+        Releasable releasableConnection2 = taskManager.registerChildConnection(task2.getId(), connection2);
+        Releasable releasableConnection3 = taskManager.registerChildConnection(task1.getId(), connection1);
+
+        assertEquals(2, taskManager.childTasksPerConnection(task1.getId(), connection1).intValue());
+        assertEquals(1, taskManager.childTasksPerConnection(task2.getId(), connection2).intValue());
+
+        releasableConnection1.close();
+        assertEquals(1, taskManager.childTasksPerConnection(task1.getId(), connection1).intValue());
+
+        releasableConnection2.close();
+        assertNull(taskManager.childTasksPerConnection(task2.getId(), connection2));
+
+        releasableConnection3.close();
+        assertNull(taskManager.childTasksPerConnection(task1.getId(), connection1));
+    }
+
     static class CancellableRequest extends TransportRequest {
         private final String requestId;
 
@@ -265,4 +295,58 @@ public class TaskManagerTests extends ESTestCase {
             super.addCloseListener(listener);
         }
     }
+
+    public static final class MockConnection implements Transport.Connection {
+        @Override
+        public DiscoveryNode getNode() {
+            return null;
+        }
+
+        @Override
+        public void sendRequest(long requestId, String action, TransportRequest request, TransportRequestOptions options)
+            throws TransportException {
+            throw new UnsupportedOperationException();
+        }
+
+        @Override
+        public void addCloseListener(ActionListener<Void> listener) {}
+
+        @Override
+        public void addRemovedListener(ActionListener<Void> listener) {}
+
+        @Override
+        public boolean isClosed() {
+            return false;
+        }
+
+        @Override
+        public void close() {
+            throw new UnsupportedOperationException();
+        }
+
+        @Override
+        public void onRemoved() {
+            throw new UnsupportedOperationException();
+        }
+
+        @Override
+        public void incRef() {}
+
+        @Override
+        public boolean tryIncRef() {
+            return true;
+        }
+
+        @Override
+        public boolean decRef() {
+            assert false : "shouldn't release a mock connection";
+            return false;
+        }
+
+        @Override
+        public boolean hasReferences() {
+            return true;
+        }
+    }
+
 }