Explorar o código

Correct context for ClusterConnManager listener (#83035)

Today `ClusterConnectionManager#connectToNode` completes its listeners
in the thread context in which the connection completes, which may not
be the correct context if there are multiple concurrent connection
attempts. With this commit we make sure to complete each listener in the
context in which it was passed to the corresponding call to
`connectToNode`.

Co-authored-by: ievgen.degtiarenko <ievgen.degtiarenko@elastic.co>
David Turner %!s(int64=3) %!d(string=hai) anos
pai
achega
39b3cb8fe1
Modificáronse 31 ficheiros con 243 adicións e 107 borrados
  1. 5 0
      docs/changelog/83035.yaml
  2. 14 4
      server/src/main/java/org/elasticsearch/transport/ClusterConnectionManager.java
  3. 1 1
      server/src/main/java/org/elasticsearch/transport/RemoteClusterConnection.java
  4. 1 1
      server/src/main/java/org/elasticsearch/transport/TransportService.java
  5. 2 1
      server/src/test/java/org/elasticsearch/action/main/MainActionTests.java
  6. 1 1
      server/src/test/java/org/elasticsearch/action/search/MultiSearchActionTookTests.java
  7. 1 1
      server/src/test/java/org/elasticsearch/action/support/replication/TransportWriteActionTests.java
  8. 3 2
      server/src/test/java/org/elasticsearch/cluster/coordination/JoinHelperTests.java
  9. 9 2
      server/src/test/java/org/elasticsearch/discovery/PeerFinderTests.java
  10. 33 22
      server/src/test/java/org/elasticsearch/transport/ClusterConnectionManagerTests.java
  11. 35 7
      server/src/test/java/org/elasticsearch/transport/ProxyConnectionStrategyTests.java
  12. 5 1
      server/src/test/java/org/elasticsearch/transport/RemoteConnectionManagerTests.java
  13. 18 3
      server/src/test/java/org/elasticsearch/transport/RemoteConnectionStrategyTests.java
  14. 55 11
      server/src/test/java/org/elasticsearch/transport/SniffConnectionStrategyTests.java
  15. 3 1
      test/framework/src/main/java/org/elasticsearch/test/transport/MockTransport.java
  16. 1 1
      test/framework/src/main/java/org/elasticsearch/test/transport/MockTransportService.java
  17. 1 1
      x-pack/plugin/identity-provider/src/test/java/org/elasticsearch/xpack/idp/action/TransportSamlInitiateSingleSignOnActionTests.java
  18. 1 1
      x-pack/plugin/security/src/test/java/org/elasticsearch/xpack/security/action/oidc/TransportOpenIdConnectLogoutActionTests.java
  19. 4 3
      x-pack/plugin/security/src/test/java/org/elasticsearch/xpack/security/action/role/TransportDeleteRoleActionTests.java
  20. 5 4
      x-pack/plugin/security/src/test/java/org/elasticsearch/xpack/security/action/role/TransportGetRolesActionTests.java
  21. 6 5
      x-pack/plugin/security/src/test/java/org/elasticsearch/xpack/security/action/role/TransportPutRoleActionTests.java
  22. 2 1
      x-pack/plugin/security/src/test/java/org/elasticsearch/xpack/security/action/rolemapping/TransportGetRoleMappingsActionTests.java
  23. 2 1
      x-pack/plugin/security/src/test/java/org/elasticsearch/xpack/security/action/rolemapping/TransportPutRoleMappingActionTests.java
  24. 1 1
      x-pack/plugin/security/src/test/java/org/elasticsearch/xpack/security/action/saml/TransportSamlInvalidateSessionActionTests.java
  25. 1 1
      x-pack/plugin/security/src/test/java/org/elasticsearch/xpack/security/action/saml/TransportSamlLogoutActionTests.java
  26. 4 3
      x-pack/plugin/security/src/test/java/org/elasticsearch/xpack/security/action/user/TransportAuthenticateActionTests.java
  27. 6 5
      x-pack/plugin/security/src/test/java/org/elasticsearch/xpack/security/action/user/TransportChangePasswordActionTests.java
  28. 6 5
      x-pack/plugin/security/src/test/java/org/elasticsearch/xpack/security/action/user/TransportDeleteUserActionTests.java
  29. 6 6
      x-pack/plugin/security/src/test/java/org/elasticsearch/xpack/security/action/user/TransportGetUsersActionTests.java
  30. 6 6
      x-pack/plugin/security/src/test/java/org/elasticsearch/xpack/security/action/user/TransportPutUserActionTests.java
  31. 5 5
      x-pack/plugin/security/src/test/java/org/elasticsearch/xpack/security/action/user/TransportSetEnabledActionTests.java

+ 5 - 0
docs/changelog/83035.yaml

@@ -0,0 +1,5 @@
+pr: 83035
+summary: Correct context for `ClusterConnManager` listener
+area: Network
+type: bug
+issues: []

+ 14 - 4
server/src/main/java/org/elasticsearch/transport/ClusterConnectionManager.java

@@ -10,11 +10,13 @@ package org.elasticsearch.transport;
 import org.apache.logging.log4j.LogManager;
 import org.apache.logging.log4j.Logger;
 import org.elasticsearch.action.ActionListener;
+import org.elasticsearch.action.support.ContextPreservingActionListener;
 import org.elasticsearch.cluster.node.DiscoveryNode;
 import org.elasticsearch.common.settings.Settings;
 import org.elasticsearch.common.util.concurrent.ConcurrentCollections;
 import org.elasticsearch.common.util.concurrent.ListenableFuture;
 import org.elasticsearch.common.util.concurrent.RunOnce;
+import org.elasticsearch.common.util.concurrent.ThreadContext;
 import org.elasticsearch.core.AbstractRefCounted;
 import org.elasticsearch.core.Nullable;
 import org.elasticsearch.core.Releasable;
@@ -44,18 +46,20 @@ public class ClusterConnectionManager implements ConnectionManager {
     private final AbstractRefCounted connectingRefCounter = AbstractRefCounted.of(this::pendingConnectionsComplete);
 
     private final Transport transport;
+    private final ThreadContext threadContext;
     private final ConnectionProfile defaultProfile;
     private final AtomicBoolean closing = new AtomicBoolean(false);
     private final CountDownLatch closeLatch = new CountDownLatch(1);
     private final DelegatingNodeConnectionListener connectionListener = new DelegatingNodeConnectionListener();
 
-    public ClusterConnectionManager(Settings settings, Transport transport) {
-        this(ConnectionProfile.buildDefaultConnectionProfile(settings), transport);
+    public ClusterConnectionManager(Settings settings, Transport transport, ThreadContext threadContext) {
+        this(ConnectionProfile.buildDefaultConnectionProfile(settings), transport, threadContext);
     }
 
-    public ClusterConnectionManager(ConnectionProfile connectionProfile, Transport transport) {
+    public ClusterConnectionManager(ConnectionProfile connectionProfile, Transport transport, ThreadContext threadContext) {
         this.transport = transport;
         this.defaultProfile = connectionProfile;
+        this.threadContext = threadContext;
     }
 
     @Override
@@ -91,7 +95,13 @@ public class ClusterConnectionManager implements ConnectionManager {
         ConnectionValidator connectionValidator,
         ActionListener<Releasable> listener
     ) throws ConnectTransportException {
-        connectToNodeOrRetry(node, connectionProfile, connectionValidator, 0, listener);
+        connectToNodeOrRetry(
+            node,
+            connectionProfile,
+            connectionValidator,
+            0,
+            ContextPreservingActionListener.wrapPreservingContext(listener, threadContext)
+        );
     }
 
     /**

+ 1 - 1
server/src/main/java/org/elasticsearch/transport/RemoteClusterConnection.java

@@ -197,7 +197,7 @@ final class RemoteClusterConnection implements Closeable {
     }
 
     private static ConnectionManager createConnectionManager(ConnectionProfile connectionProfile, TransportService transportService) {
-        return new ClusterConnectionManager(connectionProfile, transportService.transport);
+        return new ClusterConnectionManager(connectionProfile, transportService.transport, transportService.threadPool.getThreadContext());
     }
 
     ConnectionManager getConnectionManager() {

+ 1 - 1
server/src/main/java/org/elasticsearch/transport/TransportService.java

@@ -186,7 +186,7 @@ public class TransportService extends AbstractLifecycleComponent
             localNodeFactory,
             clusterSettings,
             taskHeaders,
-            new ClusterConnectionManager(settings, transport)
+            new ClusterConnectionManager(settings, transport, threadPool.getThreadContext())
         );
     }
 

+ 2 - 1
server/src/test/java/org/elasticsearch/action/main/MainActionTests.java

@@ -20,6 +20,7 @@ import org.elasticsearch.common.settings.Settings;
 import org.elasticsearch.rest.RestStatus;
 import org.elasticsearch.tasks.Task;
 import org.elasticsearch.test.ESTestCase;
+import org.elasticsearch.threadpool.ThreadPool;
 import org.elasticsearch.transport.Transport;
 import org.elasticsearch.transport.TransportService;
 
@@ -77,7 +78,7 @@ public class MainActionTests extends ESTestCase {
         TransportService transportService = new TransportService(
             Settings.EMPTY,
             mock(Transport.class),
-            null,
+            mock(ThreadPool.class),
             TransportService.NOOP_TRANSPORT_INTERCEPTOR,
             x -> null,
             null,

+ 1 - 1
server/src/test/java/org/elasticsearch/action/search/MultiSearchActionTookTests.java

@@ -122,7 +122,7 @@ public class MultiSearchActionTookTests extends ESTestCase {
         TransportService transportService = new TransportService(
             Settings.EMPTY,
             mock(Transport.class),
-            null,
+            threadPool,
             TransportService.NOOP_TRANSPORT_INTERCEPTOR,
             boundAddress -> DiscoveryNode.createLocal(settings, boundAddress.publishAddress(), UUIDs.randomBase64UUID()),
             null,

+ 1 - 1
server/src/test/java/org/elasticsearch/action/support/replication/TransportWriteActionTests.java

@@ -387,7 +387,7 @@ public class TransportWriteActionTests extends ESTestCase {
                 new TransportService(
                     Settings.EMPTY,
                     mock(Transport.class),
-                    null,
+                    TransportWriteActionTests.threadPool,
                     TransportService.NOOP_TRANSPORT_INTERCEPTOR,
                     x -> null,
                     null,

+ 3 - 2
server/src/test/java/org/elasticsearch/cluster/coordination/JoinHelperTests.java

@@ -66,15 +66,16 @@ public class JoinHelperTests extends ESTestCase {
         DeterministicTaskQueue deterministicTaskQueue = new DeterministicTaskQueue();
         CapturingTransport capturingTransport = new HandshakingCapturingTransport();
         DiscoveryNode localNode = new DiscoveryNode("node0", buildNewFakeTransportAddress(), Version.CURRENT);
+        final ThreadPool threadPool = deterministicTaskQueue.getThreadPool();
         TransportService transportService = new TransportService(
             Settings.EMPTY,
             capturingTransport,
-            deterministicTaskQueue.getThreadPool(),
+            threadPool,
             TransportService.NOOP_TRANSPORT_INTERCEPTOR,
             x -> localNode,
             null,
             Collections.emptySet(),
-            new ClusterConnectionManager(Settings.EMPTY, capturingTransport)
+            new ClusterConnectionManager(Settings.EMPTY, capturingTransport, threadPool.getThreadContext())
         );
         JoinHelper joinHelper = new JoinHelper(
             Settings.EMPTY,

+ 9 - 2
server/src/test/java/org/elasticsearch/discovery/PeerFinderTests.java

@@ -29,6 +29,7 @@ import org.elasticsearch.test.junit.annotations.TestLogging;
 import org.elasticsearch.test.transport.CapturingTransport;
 import org.elasticsearch.test.transport.CapturingTransport.CapturedRequest;
 import org.elasticsearch.test.transport.StubbableConnectionManager;
+import org.elasticsearch.threadpool.ThreadPool;
 import org.elasticsearch.transport.ClusterConnectionManager;
 import org.elasticsearch.transport.ConnectionManager;
 import org.elasticsearch.transport.TransportException;
@@ -210,7 +211,13 @@ public class PeerFinderTests extends ESTestCase {
 
         localNode = newDiscoveryNode("local-node");
 
-        ConnectionManager innerConnectionManager = new ClusterConnectionManager(settings, capturingTransport);
+        final ThreadPool threadPool = deterministicTaskQueue.getThreadPool();
+
+        final ConnectionManager innerConnectionManager = new ClusterConnectionManager(
+            settings,
+            capturingTransport,
+            threadPool.getThreadContext()
+        );
         StubbableConnectionManager connectionManager = new StubbableConnectionManager(innerConnectionManager);
         connectionManager.setDefaultNodeConnectedBehavior((cm, discoveryNode) -> {
             final boolean isConnected = connectedNodes.contains(discoveryNode);
@@ -222,7 +229,7 @@ public class PeerFinderTests extends ESTestCase {
         transportService = new TransportService(
             settings,
             capturingTransport,
-            deterministicTaskQueue.getThreadPool(),
+            threadPool,
             TransportService.NOOP_TRANSPORT_INTERCEPTOR,
             boundTransportAddress -> localNode,
             null,

+ 33 - 22
server/src/test/java/org/elasticsearch/transport/ClusterConnectionManagerTests.java

@@ -19,6 +19,7 @@ import org.elasticsearch.cluster.node.DiscoveryNodeRole;
 import org.elasticsearch.common.logging.Loggers;
 import org.elasticsearch.common.settings.Settings;
 import org.elasticsearch.common.transport.TransportAddress;
+import org.elasticsearch.common.util.concurrent.ThreadContext;
 import org.elasticsearch.core.Releasable;
 import org.elasticsearch.core.Releasables;
 import org.elasticsearch.core.TimeValue;
@@ -45,6 +46,7 @@ import java.util.concurrent.atomic.AtomicReference;
 import java.util.function.Supplier;
 
 import static org.elasticsearch.test.ActionListenerUtils.anyActionListener;
+import static org.hamcrest.Matchers.equalTo;
 import static org.hamcrest.Matchers.notNullValue;
 import static org.mockito.ArgumentMatchers.any;
 import static org.mockito.ArgumentMatchers.eq;
@@ -63,7 +65,7 @@ public class ClusterConnectionManagerTests extends ESTestCase {
         Settings settings = Settings.builder().put("node.name", ClusterConnectionManagerTests.class.getSimpleName()).build();
         threadPool = new ThreadPool(settings);
         transport = mock(Transport.class);
-        connectionManager = new ClusterConnectionManager(settings, transport);
+        connectionManager = new ClusterConnectionManager(settings, transport, threadPool.getThreadContext());
         TimeValue oneSecond = new TimeValue(1000);
         TimeValue oneMinute = TimeValue.timeValueMinutes(1);
         connectionProfile = ConnectionProfile.buildSingleChannelProfile(
@@ -254,6 +256,9 @@ public class ClusterConnectionManagerTests extends ESTestCase {
         int threadCount = between(1, 10);
         Releasable[] releasables = new Releasable[threadCount];
 
+        final ThreadContext threadContext = threadPool.getThreadContext();
+        final String contextHeader = "test-context-header";
+
         CyclicBarrier barrier = new CyclicBarrier(threadCount + 1);
         Semaphore pendingCloses = new Semaphore(threadCount);
         for (int i = 0; i < threadCount; i++) {
@@ -265,27 +270,33 @@ public class ClusterConnectionManagerTests extends ESTestCase {
                     throw new RuntimeException(e);
                 }
                 CountDownLatch latch = new CountDownLatch(1);
-                connectionManager.connectToNode(node, connectionProfile, validator, ActionListener.wrap(c -> {
-                    assert connectionManager.nodeConnected(node);
-
-                    assertTrue(pendingCloses.tryAcquire());
-                    connectionManager.getConnection(node).addRemovedListener(ActionListener.wrap(pendingCloses::release));
-
-                    if (randomBoolean()) {
-                        releasables[threadIndex] = c;
-                        nodeConnectedCount.incrementAndGet();
-                    } else {
-                        Releasables.close(c);
-                        nodeClosedCount.incrementAndGet();
-                    }
-
-                    assert latch.getCount() == 1;
-                    latch.countDown();
-                }, e -> {
-                    nodeFailureCount.incrementAndGet();
-                    assert latch.getCount() == 1;
-                    latch.countDown();
-                }));
+                try (ThreadContext.StoredContext ignored = threadContext.stashContext()) {
+                    final String contextValue = randomAlphaOfLength(10);
+                    threadContext.putHeader(contextHeader, contextValue);
+                    connectionManager.connectToNode(node, connectionProfile, validator, ActionListener.wrap(c -> {
+                        assert connectionManager.nodeConnected(node);
+                        assertThat(threadContext.getHeader(contextHeader), equalTo(contextValue));
+
+                        assertTrue(pendingCloses.tryAcquire());
+                        connectionManager.getConnection(node).addRemovedListener(ActionListener.wrap(pendingCloses::release));
+
+                        if (randomBoolean()) {
+                            releasables[threadIndex] = c;
+                            nodeConnectedCount.incrementAndGet();
+                        } else {
+                            Releasables.close(c);
+                            nodeClosedCount.incrementAndGet();
+                        }
+
+                        assert latch.getCount() == 1;
+                        latch.countDown();
+                    }, e -> {
+                        assertThat(threadContext.getHeader(contextHeader), equalTo(contextValue));
+                        nodeFailureCount.incrementAndGet();
+                        assert latch.getCount() == 1;
+                        latch.countDown();
+                    }));
+                }
                 try {
                     latch.await();
                 } catch (InterruptedException e) {

+ 35 - 7
server/src/test/java/org/elasticsearch/transport/ProxyConnectionStrategyTests.java

@@ -83,7 +83,11 @@ public class ProxyConnectionStrategyTests extends ESTestCase {
                 localService.start();
                 localService.acceptIncomingRequests();
 
-                ClusterConnectionManager connectionManager = new ClusterConnectionManager(profile, localService.transport);
+                final ClusterConnectionManager connectionManager = new ClusterConnectionManager(
+                    profile,
+                    localService.transport,
+                    threadPool.getThreadContext()
+                );
                 int numOfConnections = randomIntBetween(4, 8);
                 try (
                     RemoteConnectionManager remoteConnectionManager = new RemoteConnectionManager(clusterAlias, connectionManager);
@@ -127,7 +131,11 @@ public class ProxyConnectionStrategyTests extends ESTestCase {
                 localService.start();
                 localService.acceptIncomingRequests();
 
-                ClusterConnectionManager connectionManager = new ClusterConnectionManager(profile, localService.transport);
+                final ClusterConnectionManager connectionManager = new ClusterConnectionManager(
+                    profile,
+                    localService.transport,
+                    threadPool.getThreadContext()
+                );
                 int numOfConnections = randomIntBetween(4, 8);
 
                 AtomicBoolean useAddress1 = new AtomicBoolean(true);
@@ -189,7 +197,11 @@ public class ProxyConnectionStrategyTests extends ESTestCase {
                 localService.start();
                 localService.acceptIncomingRequests();
 
-                ClusterConnectionManager connectionManager = new ClusterConnectionManager(profile, localService.transport);
+                final ClusterConnectionManager connectionManager = new ClusterConnectionManager(
+                    profile,
+                    localService.transport,
+                    threadPool.getThreadContext()
+                );
                 int numOfConnections = randomIntBetween(4, 8);
                 try (
                     RemoteConnectionManager remoteConnectionManager = new RemoteConnectionManager(clusterAlias, connectionManager);
@@ -232,7 +244,11 @@ public class ProxyConnectionStrategyTests extends ESTestCase {
                 localService.start();
                 localService.acceptIncomingRequests();
 
-                ClusterConnectionManager connectionManager = new ClusterConnectionManager(profile, localService.transport);
+                final ClusterConnectionManager connectionManager = new ClusterConnectionManager(
+                    profile,
+                    localService.transport,
+                    threadPool.getThreadContext()
+                );
                 int numOfConnections = randomIntBetween(4, 8);
 
                 AtomicBoolean useAddress1 = new AtomicBoolean(true);
@@ -295,7 +311,11 @@ public class ProxyConnectionStrategyTests extends ESTestCase {
                 localService.start();
                 localService.acceptIncomingRequests();
 
-                ClusterConnectionManager connectionManager = new ClusterConnectionManager(profile, localService.transport);
+                final ClusterConnectionManager connectionManager = new ClusterConnectionManager(
+                    profile,
+                    localService.transport,
+                    threadPool.getThreadContext()
+                );
                 int numOfConnections = randomIntBetween(4, 8);
                 try (
                     RemoteConnectionManager remoteConnectionManager = new RemoteConnectionManager(clusterAlias, connectionManager);
@@ -330,7 +350,11 @@ public class ProxyConnectionStrategyTests extends ESTestCase {
                 localService.start();
                 localService.acceptIncomingRequests();
 
-                ClusterConnectionManager connectionManager = new ClusterConnectionManager(profile, localService.transport);
+                final ClusterConnectionManager connectionManager = new ClusterConnectionManager(
+                    profile,
+                    localService.transport,
+                    threadPool.getThreadContext()
+                );
                 int numOfConnections = randomIntBetween(4, 8);
                 try (
                     RemoteConnectionManager remoteConnectionManager = new RemoteConnectionManager(clusterAlias, connectionManager);
@@ -435,7 +459,11 @@ public class ProxyConnectionStrategyTests extends ESTestCase {
 
                 String address = "localhost:" + address1.getPort();
 
-                ClusterConnectionManager connectionManager = new ClusterConnectionManager(profile, localService.transport);
+                final ClusterConnectionManager connectionManager = new ClusterConnectionManager(
+                    profile,
+                    localService.transport,
+                    threadPool.getThreadContext()
+                );
                 int numOfConnections = randomIntBetween(4, 8);
                 try (
                     RemoteConnectionManager remoteConnectionManager = new RemoteConnectionManager(clusterAlias, connectionManager);

+ 5 - 1
server/src/test/java/org/elasticsearch/transport/RemoteConnectionManagerTests.java

@@ -13,6 +13,7 @@ import org.elasticsearch.action.support.PlainActionFuture;
 import org.elasticsearch.cluster.node.DiscoveryNode;
 import org.elasticsearch.common.settings.Settings;
 import org.elasticsearch.common.transport.TransportAddress;
+import org.elasticsearch.common.util.concurrent.ThreadContext;
 import org.elasticsearch.test.ESTestCase;
 
 import java.net.InetAddress;
@@ -35,7 +36,10 @@ public class RemoteConnectionManagerTests extends ESTestCase {
     public void setUp() throws Exception {
         super.setUp();
         transport = mock(Transport.class);
-        remoteConnectionManager = new RemoteConnectionManager("remote-cluster", new ClusterConnectionManager(Settings.EMPTY, transport));
+        remoteConnectionManager = new RemoteConnectionManager(
+            "remote-cluster",
+            new ClusterConnectionManager(Settings.EMPTY, transport, new ThreadContext(Settings.EMPTY))
+        );
     }
 
     @SuppressWarnings("unchecked")

+ 18 - 3
server/src/test/java/org/elasticsearch/transport/RemoteConnectionStrategyTests.java

@@ -10,6 +10,7 @@ package org.elasticsearch.transport;
 
 import org.elasticsearch.action.ActionListener;
 import org.elasticsearch.common.settings.Settings;
+import org.elasticsearch.common.util.concurrent.ThreadContext;
 import org.elasticsearch.core.TimeValue;
 import org.elasticsearch.test.ESTestCase;
 
@@ -17,8 +18,14 @@ import static org.mockito.Mockito.mock;
 
 public class RemoteConnectionStrategyTests extends ESTestCase {
 
+    private static final ThreadContext threadContext = new ThreadContext(Settings.EMPTY);
+
     public void testStrategyChangeMeansThatStrategyMustBeRebuilt() {
-        ClusterConnectionManager connectionManager = new ClusterConnectionManager(Settings.EMPTY, mock(Transport.class));
+        final ClusterConnectionManager connectionManager = new ClusterConnectionManager(
+            Settings.EMPTY,
+            mock(Transport.class),
+            threadContext
+        );
         RemoteConnectionManager remoteConnectionManager = new RemoteConnectionManager("cluster-alias", connectionManager);
         FakeConnectionStrategy first = new FakeConnectionStrategy(
             "cluster-alias",
@@ -34,7 +41,11 @@ public class RemoteConnectionStrategyTests extends ESTestCase {
     }
 
     public void testSameStrategyChangeMeansThatStrategyDoesNotNeedToBeRebuilt() {
-        ClusterConnectionManager connectionManager = new ClusterConnectionManager(Settings.EMPTY, mock(Transport.class));
+        final ClusterConnectionManager connectionManager = new ClusterConnectionManager(
+            Settings.EMPTY,
+            mock(Transport.class),
+            threadContext
+        );
         RemoteConnectionManager remoteConnectionManager = new RemoteConnectionManager("cluster-alias", connectionManager);
         FakeConnectionStrategy first = new FakeConnectionStrategy(
             "cluster-alias",
@@ -50,7 +61,11 @@ public class RemoteConnectionStrategyTests extends ESTestCase {
     }
 
     public void testChangeInConnectionProfileMeansTheStrategyMustBeRebuilt() {
-        ClusterConnectionManager connectionManager = new ClusterConnectionManager(TestProfiles.LIGHT_PROFILE, mock(Transport.class));
+        final ClusterConnectionManager connectionManager = new ClusterConnectionManager(
+            TestProfiles.LIGHT_PROFILE,
+            mock(Transport.class),
+            threadContext
+        );
         assertEquals(TimeValue.MINUS_ONE, connectionManager.getConnectionProfile().getPingInterval());
         assertEquals(Compression.Enabled.INDEXING_DATA, connectionManager.getConnectionProfile().getCompressionEnabled());
         assertEquals(Compression.Scheme.LZ4, connectionManager.getConnectionProfile().getCompressionScheme());

+ 55 - 11
server/src/test/java/org/elasticsearch/transport/SniffConnectionStrategyTests.java

@@ -123,7 +123,11 @@ public class SniffConnectionStrategyTests extends ESTestCase {
                 localService.start();
                 localService.acceptIncomingRequests();
 
-                ClusterConnectionManager connectionManager = new ClusterConnectionManager(profile, localService.transport);
+                final ClusterConnectionManager connectionManager = new ClusterConnectionManager(
+                    profile,
+                    localService.transport,
+                    threadPool.getThreadContext()
+                );
                 try (
                     RemoteConnectionManager remoteConnectionManager = new RemoteConnectionManager(clusterAlias, connectionManager);
                     SniffConnectionStrategy strategy = new SniffConnectionStrategy(
@@ -172,7 +176,11 @@ public class SniffConnectionStrategyTests extends ESTestCase {
                 localService.start();
                 localService.acceptIncomingRequests();
 
-                ClusterConnectionManager connectionManager = new ClusterConnectionManager(profile, localService.transport);
+                final ClusterConnectionManager connectionManager = new ClusterConnectionManager(
+                    profile,
+                    localService.transport,
+                    threadPool.getThreadContext()
+                );
                 try (
                     RemoteConnectionManager remoteConnectionManager = new RemoteConnectionManager(clusterAlias, connectionManager);
                     SniffConnectionStrategy strategy = new SniffConnectionStrategy(
@@ -220,7 +228,11 @@ public class SniffConnectionStrategyTests extends ESTestCase {
                 localService.start();
                 localService.acceptIncomingRequests();
 
-                ClusterConnectionManager connectionManager = new ClusterConnectionManager(profile, localService.transport);
+                final ClusterConnectionManager connectionManager = new ClusterConnectionManager(
+                    profile,
+                    localService.transport,
+                    threadPool.getThreadContext()
+                );
                 try (
                     RemoteConnectionManager remoteConnectionManager = new RemoteConnectionManager(clusterAlias, connectionManager);
                     SniffConnectionStrategy strategy = new SniffConnectionStrategy(
@@ -277,7 +289,11 @@ public class SniffConnectionStrategyTests extends ESTestCase {
                 localService.start();
                 localService.acceptIncomingRequests();
 
-                ClusterConnectionManager connectionManager = new ClusterConnectionManager(profile, localService.transport);
+                final ClusterConnectionManager connectionManager = new ClusterConnectionManager(
+                    profile,
+                    localService.transport,
+                    threadPool.getThreadContext()
+                );
                 try (
                     RemoteConnectionManager remoteConnectionManager = new RemoteConnectionManager(clusterAlias, connectionManager);
                     SniffConnectionStrategy strategy = new SniffConnectionStrategy(
@@ -316,7 +332,11 @@ public class SniffConnectionStrategyTests extends ESTestCase {
                 localService.start();
                 localService.acceptIncomingRequests();
 
-                ClusterConnectionManager connectionManager = new ClusterConnectionManager(profile, localService.transport);
+                final ClusterConnectionManager connectionManager = new ClusterConnectionManager(
+                    profile,
+                    localService.transport,
+                    threadPool.getThreadContext()
+                );
                 try (
                     RemoteConnectionManager remoteConnectionManager = new RemoteConnectionManager(clusterAlias, connectionManager);
                     SniffConnectionStrategy strategy = new SniffConnectionStrategy(
@@ -358,7 +378,11 @@ public class SniffConnectionStrategyTests extends ESTestCase {
                 localService.start();
                 localService.acceptIncomingRequests();
 
-                ClusterConnectionManager connectionManager = new ClusterConnectionManager(profile, localService.transport);
+                final ClusterConnectionManager connectionManager = new ClusterConnectionManager(
+                    profile,
+                    localService.transport,
+                    threadPool.getThreadContext()
+                );
                 try (
                     RemoteConnectionManager remoteConnectionManager = new RemoteConnectionManager(clusterAlias, connectionManager);
                     SniffConnectionStrategy strategy = new SniffConnectionStrategy(
@@ -405,7 +429,11 @@ public class SniffConnectionStrategyTests extends ESTestCase {
                 localService.acceptIncomingRequests();
 
                 // Predicate excludes seed node as a possible connection
-                ClusterConnectionManager connectionManager = new ClusterConnectionManager(profile, localService.transport);
+                final ClusterConnectionManager connectionManager = new ClusterConnectionManager(
+                    profile,
+                    localService.transport,
+                    threadPool.getThreadContext()
+                );
                 try (
                     RemoteConnectionManager remoteConnectionManager = new RemoteConnectionManager(clusterAlias, connectionManager);
                     SniffConnectionStrategy strategy = new SniffConnectionStrategy(
@@ -454,7 +482,11 @@ public class SniffConnectionStrategyTests extends ESTestCase {
                 localService.start();
                 localService.acceptIncomingRequests();
 
-                ClusterConnectionManager connectionManager = new ClusterConnectionManager(profile, localService.transport);
+                final ClusterConnectionManager connectionManager = new ClusterConnectionManager(
+                    profile,
+                    localService.transport,
+                    threadPool.getThreadContext()
+                );
                 try (
                     RemoteConnectionManager remoteConnectionManager = new RemoteConnectionManager(clusterAlias, connectionManager);
                     SniffConnectionStrategy strategy = new SniffConnectionStrategy(
@@ -522,7 +554,11 @@ public class SniffConnectionStrategyTests extends ESTestCase {
                 localService.start();
                 localService.acceptIncomingRequests();
 
-                ClusterConnectionManager connectionManager = new ClusterConnectionManager(profile, localService.transport);
+                final ClusterConnectionManager connectionManager = new ClusterConnectionManager(
+                    profile,
+                    localService.transport,
+                    threadPool.getThreadContext()
+                );
                 try (
                     RemoteConnectionManager remoteConnectionManager = new RemoteConnectionManager(clusterAlias, connectionManager);
                     SniffConnectionStrategy strategy = new SniffConnectionStrategy(
@@ -611,7 +647,11 @@ public class SniffConnectionStrategyTests extends ESTestCase {
 
                 List<String> seedNodes = Collections.singletonList(accessibleNode.toString());
                 TransportAddress proxyAddress = accessibleNode.getAddress();
-                ClusterConnectionManager connectionManager = new ClusterConnectionManager(profile, transport);
+                final ClusterConnectionManager connectionManager = new ClusterConnectionManager(
+                    profile,
+                    transport,
+                    threadPool.getThreadContext()
+                );
                 try (
                     RemoteConnectionManager remoteConnectionManager = new RemoteConnectionManager(clusterAlias, connectionManager);
                     SniffConnectionStrategy strategy = new SniffConnectionStrategy(
@@ -659,7 +699,11 @@ public class SniffConnectionStrategyTests extends ESTestCase {
                 localService.start();
                 localService.acceptIncomingRequests();
 
-                ClusterConnectionManager connectionManager = new ClusterConnectionManager(profile, localService.transport);
+                final ClusterConnectionManager connectionManager = new ClusterConnectionManager(
+                    profile,
+                    localService.transport,
+                    threadPool.getThreadContext()
+                );
                 try (
                     RemoteConnectionManager remoteConnectionManager = new RemoteConnectionManager(clusterAlias, connectionManager);
                     SniffConnectionStrategy strategy = new SniffConnectionStrategy(

+ 3 - 1
test/framework/src/main/java/org/elasticsearch/test/transport/MockTransport.java

@@ -57,7 +57,9 @@ public class MockTransport extends StubbableTransport {
         @Nullable ClusterSettings clusterSettings,
         Set<String> taskHeaders
     ) {
-        StubbableConnectionManager connectionManager = new StubbableConnectionManager(new ClusterConnectionManager(settings, this));
+        final StubbableConnectionManager connectionManager = new StubbableConnectionManager(
+            new ClusterConnectionManager(settings, this, threadPool.getThreadContext())
+        );
         connectionManager.setDefaultNodeConnectedBehavior((cm, node) -> false);
         connectionManager.setDefaultGetConnectionBehavior((cm, discoveryNode) -> createConnection(discoveryNode));
         return new TransportService(

+ 1 - 1
test/framework/src/main/java/org/elasticsearch/test/transport/MockTransportService.java

@@ -232,7 +232,7 @@ public final class MockTransportService extends TransportService {
             localNodeFactory,
             clusterSettings,
             taskHeaders,
-            new StubbableConnectionManager(new ClusterConnectionManager(settings, transport))
+            new StubbableConnectionManager(new ClusterConnectionManager(settings, transport, threadPool.getThreadContext()))
         );
         this.original = transport.getDelegate();
     }

+ 1 - 1
x-pack/plugin/identity-provider/src/test/java/org/elasticsearch/xpack/idp/action/TransportSamlInitiateSingleSignOnActionTests.java

@@ -135,7 +135,7 @@ public class TransportSamlInitiateSingleSignOnActionTests extends IdpSamlTestCas
         final TransportService transportService = new TransportService(
             Settings.EMPTY,
             mock(Transport.class),
-            null,
+            threadPool,
             TransportService.NOOP_TRANSPORT_INTERCEPTOR,
             x -> null,
             null,

+ 1 - 1
x-pack/plugin/security/src/test/java/org/elasticsearch/xpack/security/action/oidc/TransportOpenIdConnectLogoutActionTests.java

@@ -190,7 +190,7 @@ public class TransportOpenIdConnectLogoutActionTests extends OpenIdConnectTestCa
         final TransportService transportService = new TransportService(
             Settings.EMPTY,
             mock(Transport.class),
-            null,
+            threadPool,
             TransportService.NOOP_TRANSPORT_INTERCEPTOR,
             x -> null,
             null,

+ 4 - 3
x-pack/plugin/security/src/test/java/org/elasticsearch/xpack/security/action/role/TransportDeleteRoleActionTests.java

@@ -12,6 +12,7 @@ import org.elasticsearch.action.support.ActionFilters;
 import org.elasticsearch.common.settings.Settings;
 import org.elasticsearch.tasks.Task;
 import org.elasticsearch.test.ESTestCase;
+import org.elasticsearch.threadpool.ThreadPool;
 import org.elasticsearch.transport.Transport;
 import org.elasticsearch.transport.TransportService;
 import org.elasticsearch.xpack.core.security.action.role.DeleteRoleRequest;
@@ -45,7 +46,7 @@ public class TransportDeleteRoleActionTests extends ESTestCase {
         TransportService transportService = new TransportService(
             Settings.EMPTY,
             mock(Transport.class),
-            null,
+            mock(ThreadPool.class),
             TransportService.NOOP_TRANSPORT_INTERCEPTOR,
             (x) -> null,
             null,
@@ -82,7 +83,7 @@ public class TransportDeleteRoleActionTests extends ESTestCase {
         TransportService transportService = new TransportService(
             Settings.EMPTY,
             mock(Transport.class),
-            null,
+            mock(ThreadPool.class),
             TransportService.NOOP_TRANSPORT_INTERCEPTOR,
             (x) -> null,
             null,
@@ -130,7 +131,7 @@ public class TransportDeleteRoleActionTests extends ESTestCase {
         TransportService transportService = new TransportService(
             Settings.EMPTY,
             mock(Transport.class),
-            null,
+            mock(ThreadPool.class),
             TransportService.NOOP_TRANSPORT_INTERCEPTOR,
             (x) -> null,
             null,

+ 5 - 4
x-pack/plugin/security/src/test/java/org/elasticsearch/xpack/security/action/role/TransportGetRolesActionTests.java

@@ -13,6 +13,7 @@ import org.elasticsearch.common.Strings;
 import org.elasticsearch.common.settings.Settings;
 import org.elasticsearch.tasks.Task;
 import org.elasticsearch.test.ESTestCase;
+import org.elasticsearch.threadpool.ThreadPool;
 import org.elasticsearch.transport.Transport;
 import org.elasticsearch.transport.TransportService;
 import org.elasticsearch.xpack.core.security.action.role.GetRolesRequest;
@@ -50,7 +51,7 @@ public class TransportGetRolesActionTests extends ESTestCase {
         TransportService transportService = new TransportService(
             Settings.EMPTY,
             mock(Transport.class),
-            null,
+            mock(ThreadPool.class),
             TransportService.NOOP_TRANSPORT_INTERCEPTOR,
             x -> null,
             null,
@@ -110,7 +111,7 @@ public class TransportGetRolesActionTests extends ESTestCase {
         TransportService transportService = new TransportService(
             Settings.EMPTY,
             mock(Transport.class),
-            null,
+            mock(ThreadPool.class),
             TransportService.NOOP_TRANSPORT_INTERCEPTOR,
             x -> null,
             null,
@@ -176,7 +177,7 @@ public class TransportGetRolesActionTests extends ESTestCase {
         TransportService transportService = new TransportService(
             Settings.EMPTY,
             mock(Transport.class),
-            null,
+            mock(ThreadPool.class),
             TransportService.NOOP_TRANSPORT_INTERCEPTOR,
             x -> null,
             null,
@@ -255,7 +256,7 @@ public class TransportGetRolesActionTests extends ESTestCase {
         TransportService transportService = new TransportService(
             Settings.EMPTY,
             mock(Transport.class),
-            null,
+            mock(ThreadPool.class),
             TransportService.NOOP_TRANSPORT_INTERCEPTOR,
             x -> null,
             null,

+ 6 - 5
x-pack/plugin/security/src/test/java/org/elasticsearch/xpack/security/action/role/TransportPutRoleActionTests.java

@@ -21,6 +21,7 @@ import org.elasticsearch.join.query.HasChildQueryBuilder;
 import org.elasticsearch.join.query.HasParentQueryBuilder;
 import org.elasticsearch.tasks.Task;
 import org.elasticsearch.test.ESTestCase;
+import org.elasticsearch.threadpool.ThreadPool;
 import org.elasticsearch.transport.Transport;
 import org.elasticsearch.transport.TransportService;
 import org.elasticsearch.xcontent.NamedXContentRegistry;
@@ -87,7 +88,7 @@ public class TransportPutRoleActionTests extends ESTestCase {
         TransportService transportService = new TransportService(
             Settings.EMPTY,
             mock(Transport.class),
-            null,
+            mock(ThreadPool.class),
             TransportService.NOOP_TRANSPORT_INTERCEPTOR,
             x -> null,
             null,
@@ -129,7 +130,7 @@ public class TransportPutRoleActionTests extends ESTestCase {
         TransportService transportService = new TransportService(
             Settings.EMPTY,
             mock(Transport.class),
-            null,
+            mock(ThreadPool.class),
             TransportService.NOOP_TRANSPORT_INTERCEPTOR,
             x -> null,
             null,
@@ -182,7 +183,7 @@ public class TransportPutRoleActionTests extends ESTestCase {
         TransportService transportService = new TransportService(
             Settings.EMPTY,
             mock(Transport.class),
-            null,
+            mock(ThreadPool.class),
             TransportService.NOOP_TRANSPORT_INTERCEPTOR,
             x -> null,
             null,
@@ -232,7 +233,7 @@ public class TransportPutRoleActionTests extends ESTestCase {
         TransportService transportService = new TransportService(
             Settings.EMPTY,
             mock(Transport.class),
-            null,
+            mock(ThreadPool.class),
             TransportService.NOOP_TRANSPORT_INTERCEPTOR,
             x -> null,
             null,
@@ -287,7 +288,7 @@ public class TransportPutRoleActionTests extends ESTestCase {
         TransportService transportService = new TransportService(
             Settings.EMPTY,
             mock(Transport.class),
-            null,
+            mock(ThreadPool.class),
             TransportService.NOOP_TRANSPORT_INTERCEPTOR,
             x -> null,
             null,

+ 2 - 1
x-pack/plugin/security/src/test/java/org/elasticsearch/xpack/security/action/rolemapping/TransportGetRoleMappingsActionTests.java

@@ -13,6 +13,7 @@ import org.elasticsearch.common.Strings;
 import org.elasticsearch.common.settings.Settings;
 import org.elasticsearch.tasks.Task;
 import org.elasticsearch.test.ESTestCase;
+import org.elasticsearch.threadpool.ThreadPool;
 import org.elasticsearch.transport.Transport;
 import org.elasticsearch.transport.TransportService;
 import org.elasticsearch.xpack.core.security.action.rolemapping.GetRoleMappingsRequest;
@@ -51,7 +52,7 @@ public class TransportGetRoleMappingsActionTests extends ESTestCase {
         TransportService transportService = new TransportService(
             Settings.EMPTY,
             mock(Transport.class),
-            null,
+            mock(ThreadPool.class),
             TransportService.NOOP_TRANSPORT_INTERCEPTOR,
             x -> null,
             null,

+ 2 - 1
x-pack/plugin/security/src/test/java/org/elasticsearch/xpack/security/action/rolemapping/TransportPutRoleMappingActionTests.java

@@ -12,6 +12,7 @@ import org.elasticsearch.action.support.PlainActionFuture;
 import org.elasticsearch.common.settings.Settings;
 import org.elasticsearch.tasks.Task;
 import org.elasticsearch.test.ESTestCase;
+import org.elasticsearch.threadpool.ThreadPool;
 import org.elasticsearch.transport.Transport;
 import org.elasticsearch.transport.TransportService;
 import org.elasticsearch.xpack.core.security.action.rolemapping.PutRoleMappingRequest;
@@ -48,7 +49,7 @@ public class TransportPutRoleMappingActionTests extends ESTestCase {
         TransportService transportService = new TransportService(
             Settings.EMPTY,
             mock(Transport.class),
-            null,
+            mock(ThreadPool.class),
             TransportService.NOOP_TRANSPORT_INTERCEPTOR,
             x -> null,
             null,

+ 1 - 1
x-pack/plugin/security/src/test/java/org/elasticsearch/xpack/security/action/saml/TransportSamlInvalidateSessionActionTests.java

@@ -261,7 +261,7 @@ public class TransportSamlInvalidateSessionActionTests extends SamlTestCase {
         final TransportService transportService = new TransportService(
             Settings.EMPTY,
             mock(Transport.class),
-            null,
+            threadPool,
             TransportService.NOOP_TRANSPORT_INTERCEPTOR,
             x -> null,
             null,

+ 1 - 1
x-pack/plugin/security/src/test/java/org/elasticsearch/xpack/security/action/saml/TransportSamlLogoutActionTests.java

@@ -221,7 +221,7 @@ public class TransportSamlLogoutActionTests extends SamlTestCase {
         final TransportService transportService = new TransportService(
             Settings.EMPTY,
             mock(Transport.class),
-            null,
+            threadPool,
             TransportService.NOOP_TRANSPORT_INTERCEPTOR,
             x -> null,
             null,

+ 4 - 3
x-pack/plugin/security/src/test/java/org/elasticsearch/xpack/security/action/user/TransportAuthenticateActionTests.java

@@ -14,6 +14,7 @@ import org.elasticsearch.common.settings.Settings;
 import org.elasticsearch.common.util.ArrayUtils;
 import org.elasticsearch.tasks.Task;
 import org.elasticsearch.test.ESTestCase;
+import org.elasticsearch.threadpool.ThreadPool;
 import org.elasticsearch.transport.Transport;
 import org.elasticsearch.transport.TransportService;
 import org.elasticsearch.xpack.core.security.SecurityContext;
@@ -62,7 +63,7 @@ public class TransportAuthenticateActionTests extends ESTestCase {
         TransportService transportService = new TransportService(
             Settings.EMPTY,
             mock(Transport.class),
-            null,
+            mock(ThreadPool.class),
             TransportService.NOOP_TRANSPORT_INTERCEPTOR,
             x -> null,
             null,
@@ -99,7 +100,7 @@ public class TransportAuthenticateActionTests extends ESTestCase {
         TransportService transportService = new TransportService(
             Settings.EMPTY,
             mock(Transport.class),
-            null,
+            mock(ThreadPool.class),
             TransportService.NOOP_TRANSPORT_INTERCEPTOR,
             x -> null,
             null,
@@ -256,7 +257,7 @@ public class TransportAuthenticateActionTests extends ESTestCase {
         TransportService transportService = new TransportService(
             Settings.EMPTY,
             mock(Transport.class),
-            null,
+            mock(ThreadPool.class),
             TransportService.NOOP_TRANSPORT_INTERCEPTOR,
             x -> null,
             null,

+ 6 - 5
x-pack/plugin/security/src/test/java/org/elasticsearch/xpack/security/action/user/TransportChangePasswordActionTests.java

@@ -14,6 +14,7 @@ import org.elasticsearch.common.settings.Settings;
 import org.elasticsearch.tasks.Task;
 import org.elasticsearch.test.ESTestCase;
 import org.elasticsearch.test.SecuritySettingsSourceField;
+import org.elasticsearch.threadpool.ThreadPool;
 import org.elasticsearch.transport.Transport;
 import org.elasticsearch.transport.TransportService;
 import org.elasticsearch.xpack.core.XPackSettings;
@@ -61,7 +62,7 @@ public class TransportChangePasswordActionTests extends ESTestCase {
         TransportService transportService = new TransportService(
             Settings.EMPTY,
             mock(Transport.class),
-            null,
+            mock(ThreadPool.class),
             TransportService.NOOP_TRANSPORT_INTERCEPTOR,
             x -> null,
             null,
@@ -105,7 +106,7 @@ public class TransportChangePasswordActionTests extends ESTestCase {
         TransportService transportService = new TransportService(
             Settings.EMPTY,
             mock(Transport.class),
-            null,
+            mock(ThreadPool.class),
             TransportService.NOOP_TRANSPORT_INTERCEPTOR,
             x -> null,
             null,
@@ -167,7 +168,7 @@ public class TransportChangePasswordActionTests extends ESTestCase {
         TransportService transportService = new TransportService(
             Settings.EMPTY,
             mock(Transport.class),
-            null,
+            mock(ThreadPool.class),
             TransportService.NOOP_TRANSPORT_INTERCEPTOR,
             x -> null,
             null,
@@ -212,7 +213,7 @@ public class TransportChangePasswordActionTests extends ESTestCase {
         TransportService transportService = new TransportService(
             Settings.EMPTY,
             mock(Transport.class),
-            null,
+            mock(ThreadPool.class),
             TransportService.NOOP_TRANSPORT_INTERCEPTOR,
             x -> null,
             null,
@@ -266,7 +267,7 @@ public class TransportChangePasswordActionTests extends ESTestCase {
         TransportService transportService = new TransportService(
             Settings.EMPTY,
             mock(Transport.class),
-            null,
+            mock(ThreadPool.class),
             TransportService.NOOP_TRANSPORT_INTERCEPTOR,
             x -> null,
             null,

+ 6 - 5
x-pack/plugin/security/src/test/java/org/elasticsearch/xpack/security/action/user/TransportDeleteUserActionTests.java

@@ -12,6 +12,7 @@ import org.elasticsearch.action.support.ActionFilters;
 import org.elasticsearch.common.settings.Settings;
 import org.elasticsearch.tasks.Task;
 import org.elasticsearch.test.ESTestCase;
+import org.elasticsearch.threadpool.ThreadPool;
 import org.elasticsearch.transport.Transport;
 import org.elasticsearch.transport.TransportService;
 import org.elasticsearch.xpack.core.security.action.user.DeleteUserRequest;
@@ -51,7 +52,7 @@ public class TransportDeleteUserActionTests extends ESTestCase {
         TransportService transportService = new TransportService(
             Settings.EMPTY,
             mock(Transport.class),
-            null,
+            mock(ThreadPool.class),
             TransportService.NOOP_TRANSPORT_INTERCEPTOR,
             x -> null,
             null,
@@ -86,7 +87,7 @@ public class TransportDeleteUserActionTests extends ESTestCase {
         TransportService transportService = new TransportService(
             Settings.EMPTY,
             mock(Transport.class),
-            null,
+            mock(ThreadPool.class),
             TransportService.NOOP_TRANSPORT_INTERCEPTOR,
             x -> null,
             null,
@@ -134,7 +135,7 @@ public class TransportDeleteUserActionTests extends ESTestCase {
         TransportService transportService = new TransportService(
             Settings.EMPTY,
             mock(Transport.class),
-            null,
+            mock(ThreadPool.class),
             TransportService.NOOP_TRANSPORT_INTERCEPTOR,
             x -> null,
             null,
@@ -175,7 +176,7 @@ public class TransportDeleteUserActionTests extends ESTestCase {
         TransportService transportService = new TransportService(
             Settings.EMPTY,
             mock(Transport.class),
-            null,
+            mock(ThreadPool.class),
             TransportService.NOOP_TRANSPORT_INTERCEPTOR,
             x -> null,
             null,
@@ -226,7 +227,7 @@ public class TransportDeleteUserActionTests extends ESTestCase {
         TransportService transportService = new TransportService(
             Settings.EMPTY,
             mock(Transport.class),
-            null,
+            mock(ThreadPool.class),
             TransportService.NOOP_TRANSPORT_INTERCEPTOR,
             x -> null,
             null,

+ 6 - 6
x-pack/plugin/security/src/test/java/org/elasticsearch/xpack/security/action/user/TransportGetUsersActionTests.java

@@ -94,7 +94,7 @@ public class TransportGetUsersActionTests extends ESTestCase {
         TransportService transportService = new TransportService(
             Settings.EMPTY,
             mock(Transport.class),
-            null,
+            mock(ThreadPool.class),
             TransportService.NOOP_TRANSPORT_INTERCEPTOR,
             x -> null,
             null,
@@ -141,7 +141,7 @@ public class TransportGetUsersActionTests extends ESTestCase {
         TransportService transportService = new TransportService(
             Settings.EMPTY,
             mock(Transport.class),
-            null,
+            mock(ThreadPool.class),
             TransportService.NOOP_TRANSPORT_INTERCEPTOR,
             x -> null,
             null,
@@ -207,7 +207,7 @@ public class TransportGetUsersActionTests extends ESTestCase {
         TransportService transportService = new TransportService(
             Settings.EMPTY,
             mock(Transport.class),
-            null,
+            mock(ThreadPool.class),
             TransportService.NOOP_TRANSPORT_INTERCEPTOR,
             x -> null,
             null,
@@ -268,7 +268,7 @@ public class TransportGetUsersActionTests extends ESTestCase {
         TransportService transportService = new TransportService(
             Settings.EMPTY,
             mock(Transport.class),
-            null,
+            mock(ThreadPool.class),
             TransportService.NOOP_TRANSPORT_INTERCEPTOR,
             x -> null,
             null,
@@ -329,7 +329,7 @@ public class TransportGetUsersActionTests extends ESTestCase {
         TransportService transportService = new TransportService(
             Settings.EMPTY,
             mock(Transport.class),
-            null,
+            mock(ThreadPool.class),
             TransportService.NOOP_TRANSPORT_INTERCEPTOR,
             x -> null,
             null,
@@ -393,7 +393,7 @@ public class TransportGetUsersActionTests extends ESTestCase {
         TransportService transportService = new TransportService(
             Settings.EMPTY,
             mock(Transport.class),
-            null,
+            mock(ThreadPool.class),
             TransportService.NOOP_TRANSPORT_INTERCEPTOR,
             x -> null,
             null,

+ 6 - 6
x-pack/plugin/security/src/test/java/org/elasticsearch/xpack/security/action/user/TransportPutUserActionTests.java

@@ -64,7 +64,7 @@ public class TransportPutUserActionTests extends ESTestCase {
         TransportService transportService = new TransportService(
             Settings.EMPTY,
             mock(Transport.class),
-            null,
+            mock(ThreadPool.class),
             TransportService.NOOP_TRANSPORT_INTERCEPTOR,
             x -> null,
             null,
@@ -100,7 +100,7 @@ public class TransportPutUserActionTests extends ESTestCase {
         TransportService transportService = new TransportService(
             Settings.EMPTY,
             mock(Transport.class),
-            null,
+            mock(ThreadPool.class),
             TransportService.NOOP_TRANSPORT_INTERCEPTOR,
             x -> null,
             null,
@@ -157,7 +157,7 @@ public class TransportPutUserActionTests extends ESTestCase {
         TransportService transportService = new TransportService(
             Settings.EMPTY,
             mock(Transport.class),
-            null,
+            threadPool,
             TransportService.NOOP_TRANSPORT_INTERCEPTOR,
             x -> null,
             null,
@@ -193,7 +193,7 @@ public class TransportPutUserActionTests extends ESTestCase {
         TransportService transportService = new TransportService(
             Settings.EMPTY,
             mock(Transport.class),
-            null,
+            mock(ThreadPool.class),
             TransportService.NOOP_TRANSPORT_INTERCEPTOR,
             x -> null,
             null,
@@ -243,7 +243,7 @@ public class TransportPutUserActionTests extends ESTestCase {
         TransportService transportService = new TransportService(
             Settings.EMPTY,
             mock(Transport.class),
-            null,
+            mock(ThreadPool.class),
             TransportService.NOOP_TRANSPORT_INTERCEPTOR,
             x -> null,
             null,
@@ -271,7 +271,7 @@ public class TransportPutUserActionTests extends ESTestCase {
         TransportService transportService = new TransportService(
             Settings.EMPTY,
             mock(Transport.class),
-            null,
+            mock(ThreadPool.class),
             TransportService.NOOP_TRANSPORT_INTERCEPTOR,
             x -> null,
             null,

+ 5 - 5
x-pack/plugin/security/src/test/java/org/elasticsearch/xpack/security/action/user/TransportSetEnabledActionTests.java

@@ -70,7 +70,7 @@ public class TransportSetEnabledActionTests extends ESTestCase {
         TransportService transportService = new TransportService(
             Settings.EMPTY,
             mock(Transport.class),
-            null,
+            threadPool,
             TransportService.NOOP_TRANSPORT_INTERCEPTOR,
             x -> null,
             null,
@@ -124,7 +124,7 @@ public class TransportSetEnabledActionTests extends ESTestCase {
         TransportService transportService = new TransportService(
             Settings.EMPTY,
             mock(Transport.class),
-            null,
+            threadPool,
             TransportService.NOOP_TRANSPORT_INTERCEPTOR,
             x -> null,
             null,
@@ -198,7 +198,7 @@ public class TransportSetEnabledActionTests extends ESTestCase {
         TransportService transportService = new TransportService(
             Settings.EMPTY,
             mock(Transport.class),
-            null,
+            threadPool,
             TransportService.NOOP_TRANSPORT_INTERCEPTOR,
             x -> null,
             null,
@@ -267,7 +267,7 @@ public class TransportSetEnabledActionTests extends ESTestCase {
         TransportService transportService = new TransportService(
             Settings.EMPTY,
             mock(Transport.class),
-            null,
+            threadPool,
             TransportService.NOOP_TRANSPORT_INTERCEPTOR,
             x -> null,
             null,
@@ -326,7 +326,7 @@ public class TransportSetEnabledActionTests extends ESTestCase {
         TransportService transportService = new TransportService(
             Settings.EMPTY,
             mock(Transport.class),
-            null,
+            threadPool,
             TransportService.NOOP_TRANSPORT_INTERCEPTOR,
             x -> null,
             null,