Selaa lähdekoodia

Use consistent threadpools in CoordinatorTests (#53868)

Today in the `CoordinatorTests` each node uses multiple threadpools. This is
mostly fine as they are almost completely stateless, except for the
`ThreadContext`: by using multiple threadpools we cannot make assertions that
the thread context is/isn't preserved as we expect. This commit consolidates
the threadpool instances in use so that each node uses just one.
David Turner 5 vuotta sitten
vanhempi
commit
4a663971d5

+ 2 - 0
server/src/main/java/org/elasticsearch/cluster/coordination/PublicationTransportHandler.java

@@ -161,6 +161,7 @@ public class PublicationTransportHandler {
             public void sendPublishRequest(DiscoveryNode destination, PublishRequest publishRequest,
                                            ActionListener<PublishWithJoinResponse> originalListener) {
                 assert publishRequest.getAcceptedState() == clusterChangedEvent.state() : "state got switched on us";
+                assert transportService.getThreadPool().getThreadContext().isSystemContext();
                 final ActionListener<PublishWithJoinResponse> responseActionListener;
                 if (destination.equals(nodes.getLocalNode())) {
                     // if publishing to self, use original request instead (see currentPublishRequestToSelf for explanation)
@@ -197,6 +198,7 @@ public class PublicationTransportHandler {
             @Override
             public void sendApplyCommit(DiscoveryNode destination, ApplyCommitRequest applyCommitRequest,
                                         ActionListener<TransportResponse.Empty> responseActionListener) {
+                assert transportService.getThreadPool().getThreadContext().isSystemContext();
                 transportService.sendRequest(destination, COMMIT_STATE_ACTION_NAME, applyCommitRequest, stateRequestOptions,
                     new TransportResponseHandler<TransportResponse.Empty>() {
 

+ 3 - 2
server/src/test/java/org/elasticsearch/cluster/coordination/NodeJoinTests.java

@@ -122,9 +122,10 @@ public class NodeJoinTests extends ESTestCase {
     private void setupFakeMasterServiceAndCoordinator(long term, ClusterState initialState) {
         deterministicTaskQueue
             = new DeterministicTaskQueue(Settings.builder().put(Node.NODE_NAME_SETTING.getKey(), "test").build(), random());
+        final ThreadPool fakeThreadPool = deterministicTaskQueue.getThreadPool();
         FakeThreadPoolMasterService fakeMasterService = new FakeThreadPoolMasterService("test_node","test",
-                deterministicTaskQueue::scheduleNow);
-        setupMasterServiceAndCoordinator(term, initialState, fakeMasterService, deterministicTaskQueue.getThreadPool(), Randomness.get());
+            fakeThreadPool, deterministicTaskQueue::scheduleNow);
+        setupMasterServiceAndCoordinator(term, initialState, fakeMasterService, fakeThreadPool, Randomness.get());
         fakeMasterService.setClusterStatePublisher((event, publishListener, ackListener) -> {
             coordinator.handlePublishRequest(new PublishRequest(event.state()));
             publishListener.onResponse(null);

+ 4 - 4
server/src/test/java/org/elasticsearch/snapshots/SnapshotResiliencyTests.java

@@ -1163,15 +1163,15 @@ public class SnapshotResiliencyTests extends ESTestCase {
             TestClusterNode(DiscoveryNode node) throws IOException {
                 this.node = node;
                 final Environment environment = createEnvironment(node.getName());
-                masterService = new FakeThreadPoolMasterService(node.getName(), "test", deterministicTaskQueue::scheduleNow);
+                threadPool = deterministicTaskQueue.getThreadPool(runnable -> CoordinatorTests.onNodeLog(node, runnable));
+                masterService = new FakeThreadPoolMasterService(node.getName(), "test", threadPool, deterministicTaskQueue::scheduleNow);
                 final Settings settings = environment.settings();
                 final ClusterSettings clusterSettings = new ClusterSettings(settings, ClusterSettings.BUILT_IN_CLUSTER_SETTINGS);
-                threadPool = deterministicTaskQueue.getThreadPool();
                 clusterService = new ClusterService(settings, clusterSettings, masterService,
                     new ClusterApplierService(node.getName(), settings, clusterSettings, threadPool) {
                         @Override
                         protected PrioritizedEsThreadPoolExecutor createThreadPoolExecutor() {
-                            return new MockSinglePrioritizingExecutor(node.getName(), deterministicTaskQueue);
+                            return new MockSinglePrioritizingExecutor(node.getName(), deterministicTaskQueue, threadPool);
                         }
 
                         @Override
@@ -1211,7 +1211,7 @@ public class SnapshotResiliencyTests extends ESTestCase {
                     }
                 };
                 transportService = mockTransport.createTransportService(
-                    settings, deterministicTaskQueue.getThreadPool(runnable -> CoordinatorTests.onNodeLog(node, runnable)),
+                    settings, threadPool,
                     new TransportInterceptor() {
                         @Override
                         public <T extends TransportRequest> TransportRequestHandler<T> interceptHandler(String action, String executor,

+ 14 - 16
test/framework/src/main/java/org/elasticsearch/cluster/coordination/AbstractCoordinatorTestCase.java

@@ -911,6 +911,7 @@ public class AbstractCoordinatorTestCase extends ESTestCase {
             }
 
             private void setUp() {
+                final ThreadPool threadPool = deterministicTaskQueue.getThreadPool(this::onNode);
                 mockTransport = new DisruptableMockTransport(localNode, logger) {
                     @Override
                     protected void execute(Runnable runnable) {
@@ -928,24 +929,20 @@ public class AbstractCoordinatorTestCase extends ESTestCase {
                             .filter(transport -> transport.getLocalNode().getAddress().equals(address)).findAny();
                     }
                 };
-
                 final Settings settings = nodeSettings.hasValue(DiscoveryModule.DISCOVERY_TYPE_SETTING.getKey()) ?
                     nodeSettings : Settings.builder().put(nodeSettings)
                     .putList(ClusterBootstrapService.INITIAL_MASTER_NODES_SETTING.getKey(),
                         ClusterBootstrapService.INITIAL_MASTER_NODES_SETTING.get(Settings.EMPTY)).build(); // suppress auto-bootstrap
-                transportService = mockTransport.createTransportService(
-                    settings, deterministicTaskQueue.getThreadPool(this::onNode),
-                    getTransportInterceptor(localNode, deterministicTaskQueue.getThreadPool(this::onNode)),
-                    a -> localNode, null, emptySet());
-                masterService = new AckedFakeThreadPoolMasterService(localNode.getId(), "test",
+                transportService = mockTransport.createTransportService(settings, threadPool,
+                    getTransportInterceptor(localNode, threadPool), a -> localNode, null, emptySet());
+                masterService = new AckedFakeThreadPoolMasterService(localNode.getId(), "test", threadPool,
                     runnable -> deterministicTaskQueue.scheduleNow(onNode(runnable)));
                 final ClusterSettings clusterSettings = new ClusterSettings(settings, ClusterSettings.BUILT_IN_CLUSTER_SETTINGS);
                 clusterApplierService = new DisruptableClusterApplierService(localNode.getId(), settings, clusterSettings,
-                    deterministicTaskQueue, this::onNode);
+                    deterministicTaskQueue, threadPool);
                 clusterService = new ClusterService(settings, clusterSettings, masterService, clusterApplierService);
                 clusterService.setNodeConnectionsService(
-                    new NodeConnectionsService(clusterService.getSettings(), deterministicTaskQueue.getThreadPool(this::onNode),
-                        transportService));
+                    new NodeConnectionsService(clusterService.getSettings(), threadPool, transportService));
                 final Collection<BiConsumer<DiscoveryNode, ClusterState>> onJoinValidators =
                     Collections.singletonList((dn, cs) -> extraJoinValidators.forEach(validator -> validator.accept(dn, cs)));
                 final AllocationService allocationService = ESAllocationTestCase.createAllocationService(Settings.EMPTY);
@@ -954,8 +951,8 @@ public class AbstractCoordinatorTestCase extends ESTestCase {
                     Cluster.this::provideSeedHosts, clusterApplierService, onJoinValidators, Randomness.get(), (s, p, r) -> {},
                     getElectionStrategy());
                 masterService.setClusterStatePublisher(coordinator);
-                final GatewayService gatewayService = new GatewayService(settings, allocationService, clusterService,
-                    deterministicTaskQueue.getThreadPool(this::onNode), coordinator, null);
+                final GatewayService gatewayService
+                    = new GatewayService(settings, allocationService, clusterService, threadPool, coordinator, null);
 
                 logger.trace("starting up [{}]", localNode);
                 transportService.start();
@@ -1292,8 +1289,9 @@ public class AbstractCoordinatorTestCase extends ESTestCase {
 
         AckCollector nextAckCollector = new AckCollector();
 
-        AckedFakeThreadPoolMasterService(String nodeName, String serviceName, Consumer<Runnable> onTaskAvailableToRun) {
-            super(nodeName, serviceName, onTaskAvailableToRun);
+        AckedFakeThreadPoolMasterService(String nodeName, String serviceName, ThreadPool threadPool,
+                                         Consumer<Runnable> onTaskAvailableToRun) {
+            super(nodeName, serviceName, threadPool, onTaskAvailableToRun);
         }
 
         @Override
@@ -1323,8 +1321,8 @@ public class AbstractCoordinatorTestCase extends ESTestCase {
         private boolean applicationMayFail;
 
         DisruptableClusterApplierService(String nodeName, Settings settings, ClusterSettings clusterSettings,
-                                         DeterministicTaskQueue deterministicTaskQueue, Function<Runnable, Runnable> runnableWrapper) {
-            super(nodeName, settings, clusterSettings, deterministicTaskQueue.getThreadPool(runnableWrapper));
+                                         DeterministicTaskQueue deterministicTaskQueue, ThreadPool threadPool) {
+            super(nodeName, settings, clusterSettings, threadPool);
             this.nodeName = nodeName;
             this.deterministicTaskQueue = deterministicTaskQueue;
             addStateApplier(event -> {
@@ -1344,7 +1342,7 @@ public class AbstractCoordinatorTestCase extends ESTestCase {
 
         @Override
         protected PrioritizedEsThreadPoolExecutor createThreadPoolExecutor() {
-            return new MockSinglePrioritizingExecutor(nodeName, deterministicTaskQueue);
+            return new MockSinglePrioritizingExecutor(nodeName, deterministicTaskQueue, threadPool);
         }
 
         @Override

+ 3 - 2
test/framework/src/main/java/org/elasticsearch/cluster/coordination/MockSinglePrioritizingExecutor.java

@@ -20,6 +20,7 @@ package org.elasticsearch.cluster.coordination;
 
 import org.elasticsearch.common.util.concurrent.EsExecutors;
 import org.elasticsearch.common.util.concurrent.PrioritizedEsThreadPoolExecutor;
+import org.elasticsearch.threadpool.ThreadPool;
 
 import java.util.concurrent.TimeUnit;
 
@@ -29,7 +30,7 @@ import java.util.concurrent.TimeUnit;
  */
 public class MockSinglePrioritizingExecutor extends PrioritizedEsThreadPoolExecutor {
 
-    public MockSinglePrioritizingExecutor(String name, DeterministicTaskQueue deterministicTaskQueue) {
+    public MockSinglePrioritizingExecutor(String name, DeterministicTaskQueue deterministicTaskQueue, ThreadPool threadPool) {
         super(name, 0, 1, 0L, TimeUnit.MILLISECONDS,
             r -> new Thread() {
                 @Override
@@ -51,7 +52,7 @@ public class MockSinglePrioritizingExecutor extends PrioritizedEsThreadPoolExecu
                     });
                 }
             },
-            deterministicTaskQueue.getThreadPool().getThreadContext(), deterministicTaskQueue.getThreadPool().scheduler());
+            threadPool.getThreadContext(), threadPool.scheduler());
     }
 
     @Override

+ 8 - 13
test/framework/src/main/java/org/elasticsearch/cluster/service/FakeThreadPoolMasterService.java

@@ -41,8 +41,6 @@ import java.util.function.Consumer;
 
 import static org.apache.lucene.util.LuceneTestCase.random;
 import static org.elasticsearch.test.ESTestCase.randomInt;
-import static org.mockito.Mockito.mock;
-import static org.mockito.Mockito.when;
 
 public class FakeThreadPoolMasterService extends MasterService {
     private static final Logger logger = LogManager.getLogger(FakeThreadPoolMasterService.class);
@@ -54,21 +52,14 @@ public class FakeThreadPoolMasterService extends MasterService {
     private boolean taskInProgress = false;
     private boolean waitForPublish = false;
 
-    public FakeThreadPoolMasterService(String nodeName, String serviceName, Consumer<Runnable> onTaskAvailableToRun) {
+    public FakeThreadPoolMasterService(String nodeName, String serviceName, ThreadPool threadPool,
+                                       Consumer<Runnable> onTaskAvailableToRun) {
         super(Settings.builder().put(Node.NODE_NAME_SETTING.getKey(), nodeName).build(),
-            new ClusterSettings(Settings.EMPTY, ClusterSettings.BUILT_IN_CLUSTER_SETTINGS),
-            createMockThreadPool());
+            new ClusterSettings(Settings.EMPTY, ClusterSettings.BUILT_IN_CLUSTER_SETTINGS), threadPool);
         this.name = serviceName;
         this.onTaskAvailableToRun = onTaskAvailableToRun;
     }
 
-    private static ThreadPool createMockThreadPool() {
-        final ThreadContext context = new ThreadContext(Settings.EMPTY);
-        final ThreadPool mockThreadPool = mock(ThreadPool.class);
-        when(mockThreadPool.getThreadContext()).thenReturn(context);
-        return mockThreadPool;
-    }
-
     @Override
     protected PrioritizedEsThreadPoolExecutor createThreadPoolExecutor() {
         return new PrioritizedEsThreadPoolExecutor(name, 1, 1, 1, TimeUnit.SECONDS, EsExecutors.daemonThreadFactory(name),
@@ -110,7 +101,11 @@ public class FakeThreadPoolMasterService extends MasterService {
                     final Runnable task = pendingTasks.remove(taskIndex);
                     taskInProgress = true;
                     scheduledNextTask = false;
-                    task.run();
+                    final ThreadContext threadContext = threadPool.getThreadContext();
+                    try (ThreadContext.StoredContext ignored = threadContext.stashContext()) {
+                        threadContext.markAsSystemContext();
+                        task.run();
+                    }
                     if (waitForPublish == false) {
                         taskInProgress = false;
                     }

+ 1 - 1
test/framework/src/test/java/org/elasticsearch/cluster/coordination/MockSinglePrioritizingExecutorTests.java

@@ -29,7 +29,7 @@ public class MockSinglePrioritizingExecutorTests extends ESTestCase {
 
     public void testPrioritizedEsThreadPoolExecutor() {
         final DeterministicTaskQueue taskQueue = DeterministicTaskQueueTests.newTaskQueue();
-        final PrioritizedEsThreadPoolExecutor executor = new MockSinglePrioritizingExecutor("test", taskQueue);
+        final PrioritizedEsThreadPoolExecutor executor = new MockSinglePrioritizingExecutor("test", taskQueue, taskQueue.getThreadPool());
         final AtomicBoolean called1 = new AtomicBoolean();
         final AtomicBoolean called2 = new AtomicBoolean();
         executor.execute(new PrioritizedRunnable(Priority.NORMAL) {

+ 9 - 1
test/framework/src/test/java/org/elasticsearch/cluster/service/FakeThreadPoolMasterServiceTests.java

@@ -27,7 +27,10 @@ import org.elasticsearch.cluster.metadata.IndexMetaData;
 import org.elasticsearch.cluster.metadata.MetaData;
 import org.elasticsearch.cluster.node.DiscoveryNode;
 import org.elasticsearch.cluster.node.DiscoveryNodeRole;
+import org.elasticsearch.common.settings.Settings;
+import org.elasticsearch.common.util.concurrent.ThreadContext;
 import org.elasticsearch.test.ESTestCase;
+import org.elasticsearch.threadpool.ThreadPool;
 
 import java.util.ArrayList;
 import java.util.Collections;
@@ -37,6 +40,8 @@ import java.util.concurrent.atomic.AtomicBoolean;
 import java.util.concurrent.atomic.AtomicReference;
 
 import static org.hamcrest.Matchers.equalTo;
+import static org.mockito.Mockito.mock;
+import static org.mockito.Mockito.when;
 
 public class FakeThreadPoolMasterServiceTests extends ESTestCase {
 
@@ -48,7 +53,10 @@ public class FakeThreadPoolMasterServiceTests extends ESTestCase {
         lastClusterStateRef.set(ClusterStateCreationUtils.state(discoveryNode, discoveryNode));
         long firstClusterStateVersion = lastClusterStateRef.get().version();
         AtomicReference<ActionListener<Void>> publishingCallback = new AtomicReference<>();
-        FakeThreadPoolMasterService masterService = new FakeThreadPoolMasterService("test_node","test", runnableTasks::add);
+        final ThreadContext context = new ThreadContext(Settings.EMPTY);
+        final ThreadPool mockThreadPool = mock(ThreadPool.class);
+        when(mockThreadPool.getThreadContext()).thenReturn(context);
+        FakeThreadPoolMasterService masterService = new FakeThreadPoolMasterService("test_node","test", mockThreadPool, runnableTasks::add);
         masterService.setClusterStateSupplier(lastClusterStateRef::get);
         masterService.setClusterStatePublisher((event, publishListener, ackListener) -> {
             lastClusterStateRef.set(event.state());