Răsfoiți Sursa

Run `TransportClusterStateAction` on local node (#129872)

This action solely needs the cluster state, it can run on any node.
Since this action is invoked across clusters, we need to be able to
(de)serialize requests and responses. We introduce a new
`RemoteClusterStateRequest` that wraps the existing
`ClusterStateRequest` and implements (de)serialization.
Niels Bauman 3 luni în urmă
părinte
comite
ecbc360f37
53 a modificat fișierele cu 478 adăugiri și 459 ștergeri
  1. 5 0
      docs/changelog/129872.yaml
  2. 2 5
      plugins/discovery-gce/src/internalClusterTest/java/org/elasticsearch/discovery/gce/GceDiscoverTests.java
  3. 2 2
      qa/ccs-unavailable-clusters/src/javaRestTest/java/org/elasticsearch/search/CrossClusterSearchUnavailableClusterIT.java
  4. 2 1
      rest-api-spec/src/main/resources/rest-api-spec/api/cluster.state.json
  5. 2 58
      server/src/internalClusterTest/java/org/elasticsearch/action/admin/cluster/state/TransportClusterStateActionDisruptionIT.java
  6. 1 3
      server/src/internalClusterTest/java/org/elasticsearch/action/support/ActiveShardsObserverIT.java
  7. 13 31
      server/src/internalClusterTest/java/org/elasticsearch/cluster/MinimumMasterNodesIT.java
  8. 10 27
      server/src/internalClusterTest/java/org/elasticsearch/cluster/NoMasterNodeIT.java
  9. 1 7
      server/src/internalClusterTest/java/org/elasticsearch/cluster/coordination/InitialClusterStateIT.java
  10. 8 32
      server/src/internalClusterTest/java/org/elasticsearch/cluster/coordination/UnsafeBootstrapAndDetachCommandIT.java
  11. 1 1
      server/src/internalClusterTest/java/org/elasticsearch/cluster/routing/allocation/ShardStateIT.java
  12. 2 17
      server/src/internalClusterTest/java/org/elasticsearch/discovery/StableMasterDisruptionIT.java
  13. 6 49
      server/src/internalClusterTest/java/org/elasticsearch/gateway/RecoverAfterNodesIT.java
  14. 8 13
      server/src/internalClusterTest/java/org/elasticsearch/recovery/RelocationIT.java
  15. 2 4
      server/src/internalClusterTest/java/org/elasticsearch/search/basic/SearchRedStateIndexIT.java
  16. 1 5
      server/src/internalClusterTest/java/org/elasticsearch/snapshots/DedicatedClusterSnapshotRestoreIT.java
  17. 1 4
      server/src/main/java/org/elasticsearch/action/ActionModule.java
  18. 8 36
      server/src/main/java/org/elasticsearch/action/admin/cluster/state/ClusterStateRequest.java
  19. 2 5
      server/src/main/java/org/elasticsearch/action/admin/cluster/state/ClusterStateRequestBuilder.java
  20. 232 0
      server/src/main/java/org/elasticsearch/action/admin/cluster/state/RemoteClusterStateRequest.java
  21. 15 16
      server/src/main/java/org/elasticsearch/action/admin/cluster/state/TransportClusterStateAction.java
  22. 62 0
      server/src/main/java/org/elasticsearch/action/admin/cluster/state/TransportRemoteClusterStateAction.java
  23. 5 28
      server/src/main/java/org/elasticsearch/rest/action/admin/cluster/RestClusterStateAction.java
  24. 2 1
      server/src/main/java/org/elasticsearch/rest/action/cat/RestAllocationAction.java
  25. 2 1
      server/src/main/java/org/elasticsearch/rest/action/cat/RestCatComponentTemplateAction.java
  26. 2 1
      server/src/main/java/org/elasticsearch/rest/action/cat/RestMasterAction.java
  27. 2 1
      server/src/main/java/org/elasticsearch/rest/action/cat/RestNodeAttrsAction.java
  28. 2 1
      server/src/main/java/org/elasticsearch/rest/action/cat/RestPluginsAction.java
  29. 2 1
      server/src/main/java/org/elasticsearch/rest/action/cat/RestSegmentsAction.java
  30. 2 1
      server/src/main/java/org/elasticsearch/rest/action/cat/RestThreadPoolAction.java
  31. 2 3
      server/src/main/java/org/elasticsearch/transport/RemoteClusterConnection.java
  32. 2 2
      server/src/main/java/org/elasticsearch/transport/SniffConnectionStrategy.java
  33. 10 6
      server/src/test/java/org/elasticsearch/action/admin/cluster/state/ClusterStateRequestTests.java
  34. 4 2
      server/src/test/java/org/elasticsearch/action/admin/cluster/state/TransportClusterStateActionTests.java
  35. 3 3
      server/src/test/java/org/elasticsearch/client/internal/ParentTaskAssigningClientTests.java
  36. 1 1
      server/src/test/java/org/elasticsearch/discovery/AbstractDisruptionTestCase.java
  37. 3 1
      server/src/test/java/org/elasticsearch/snapshots/SnapshotResiliencyTests.java
  38. 5 5
      server/src/test/java/org/elasticsearch/transport/RemoteClusterClientTests.java
  39. 2 2
      server/src/test/java/org/elasticsearch/transport/RemoteClusterConnectionTests.java
  40. 2 2
      server/src/test/java/org/elasticsearch/transport/SniffConnectionStrategyTests.java
  41. 3 17
      test/framework/src/main/java/org/elasticsearch/indices/recovery/AbstractIndexRecoveryIntegTestCase.java
  42. 2 29
      test/framework/src/main/java/org/elasticsearch/test/ClusterServiceUtils.java
  43. 13 4
      test/framework/src/main/java/org/elasticsearch/test/ESIntegTestCase.java
  44. 1 1
      test/framework/src/main/java/org/elasticsearch/test/InternalTestCluster.java
  45. 4 4
      x-pack/plugin/ccr/src/main/java/org/elasticsearch/xpack/ccr/CcrLicenseChecker.java
  46. 2 3
      x-pack/plugin/ccr/src/main/java/org/elasticsearch/xpack/ccr/action/AutoFollowCoordinator.java
  47. 4 4
      x-pack/plugin/ccr/src/main/java/org/elasticsearch/xpack/ccr/action/CcrRequests.java
  48. 2 2
      x-pack/plugin/ccr/src/main/java/org/elasticsearch/xpack/ccr/action/TransportPutAutoFollowPatternAction.java
  49. 2 2
      x-pack/plugin/ccr/src/main/java/org/elasticsearch/xpack/ccr/repository/CcrRepository.java
  50. 2 2
      x-pack/plugin/ccr/src/test/java/org/elasticsearch/xpack/ccr/CcrLicenseCheckerTests.java
  51. 1 10
      x-pack/plugin/ml/src/internalClusterTest/java/org/elasticsearch/xpack/ml/integration/MlDistributedFailureIT.java
  52. 2 2
      x-pack/plugin/security/qa/security-trial/src/javaRestTest/java/org/elasticsearch/xpack/security/crossclusteraccess/CrossClusterAccessHeadersForCcsRestIT.java
  53. 1 1
      x-pack/plugin/security/src/test/java/org/elasticsearch/test/SecurityIntegTestCase.java

+ 5 - 0
docs/changelog/129872.yaml

@@ -0,0 +1,5 @@
+pr: 129872
+summary: Run `TransportClusterStateAction` on local node
+area: Distributed
+type: enhancement
+issues: []

+ 2 - 5
plugins/discovery-gce/src/internalClusterTest/java/org/elasticsearch/discovery/gce/GceDiscoverTests.java

@@ -67,8 +67,7 @@ public class GceDiscoverTests extends ESIntegTestCase {
 
         ClusterStateResponse clusterStateResponse = client(masterNode).admin()
             .cluster()
-            .prepareState(TEST_REQUEST_TIMEOUT)
-            .setMasterNodeTimeout(TimeValue.timeValueSeconds(1))
+            .prepareState(TimeValue.timeValueSeconds(1))
             .clear()
             .setNodes(true)
             .get();
@@ -79,11 +78,9 @@ public class GceDiscoverTests extends ESIntegTestCase {
         registerGceNode(secondNode);
         clusterStateResponse = client(secondNode).admin()
             .cluster()
-            .prepareState(TEST_REQUEST_TIMEOUT)
-            .setMasterNodeTimeout(TimeValue.timeValueSeconds(1))
+            .prepareState(TimeValue.timeValueSeconds(1))
             .clear()
             .setNodes(true)
-            .setLocal(true)
             .get();
         assertNotNull(clusterStateResponse.getState().nodes().getMasterNodeId());
 

+ 2 - 2
qa/ccs-unavailable-clusters/src/javaRestTest/java/org/elasticsearch/search/CrossClusterSearchUnavailableClusterIT.java

@@ -14,8 +14,8 @@ import org.apache.http.entity.ContentType;
 import org.apache.http.nio.entity.NStringEntity;
 import org.elasticsearch.TransportVersion;
 import org.elasticsearch.action.admin.cluster.state.ClusterStateAction;
-import org.elasticsearch.action.admin.cluster.state.ClusterStateRequest;
 import org.elasticsearch.action.admin.cluster.state.ClusterStateResponse;
+import org.elasticsearch.action.admin.cluster.state.RemoteClusterStateRequest;
 import org.elasticsearch.action.search.SearchRequest;
 import org.elasticsearch.action.search.SearchShardsRequest;
 import org.elasticsearch.action.search.SearchShardsResponse;
@@ -117,7 +117,7 @@ public class CrossClusterSearchUnavailableClusterIT extends ESRestTestCase {
             newService.registerRequestHandler(
                 ClusterStateAction.NAME,
                 EsExecutors.DIRECT_EXECUTOR_SERVICE,
-                ClusterStateRequest::new,
+                RemoteClusterStateRequest::new,
                 (request, channel, task) -> {
                     DiscoveryNodes.Builder builder = DiscoveryNodes.builder();
                     for (DiscoveryNode node : knownNodes) {

+ 2 - 1
rest-api-spec/src/main/resources/rest-api-spec/api/cluster.state.json

@@ -69,12 +69,13 @@
     },
     "params":{
       "local":{
+        "deprecated":true,
         "type":"boolean",
         "description":"Return local information, do not retrieve the state from master node (default: false)"
       },
       "master_timeout":{
         "type":"time",
-        "description":"Specify timeout for connection to master"
+        "description":"Timeout for waiting for new cluster state in case it is blocked"
       },
       "flat_settings":{
         "type":"boolean",

+ 2 - 58
server/src/internalClusterTest/java/org/elasticsearch/action/admin/cluster/state/TransportClusterStateActionDisruptionIT.java

@@ -9,7 +9,6 @@
 package org.elasticsearch.action.admin.cluster.state;
 
 import org.elasticsearch.action.support.SubscribableListener;
-import org.elasticsearch.cluster.ClusterState;
 import org.elasticsearch.cluster.block.ClusterBlockException;
 import org.elasticsearch.cluster.coordination.ClusterBootstrapService;
 import org.elasticsearch.cluster.metadata.Metadata;
@@ -18,7 +17,6 @@ import org.elasticsearch.cluster.node.DiscoveryNodes;
 import org.elasticsearch.cluster.service.ClusterService;
 import org.elasticsearch.common.settings.Settings;
 import org.elasticsearch.core.TimeValue;
-import org.elasticsearch.discovery.MasterNotDiscoveredException;
 import org.elasticsearch.gateway.GatewayService;
 import org.elasticsearch.plugins.Plugin;
 import org.elasticsearch.test.ESIntegTestCase;
@@ -47,34 +45,15 @@ public class TransportClusterStateActionDisruptionIT extends ESIntegTestCase {
         return Collections.singletonList(MockTransportService.TestPlugin.class);
     }
 
-    public void testNonLocalRequestAlwaysFindsMaster() throws Exception {
-        runRepeatedlyWhileChangingMaster(() -> {
-            final ClusterStateRequestBuilder clusterStateRequestBuilder = clusterAdmin().prepareState(TEST_REQUEST_TIMEOUT)
-                .clear()
-                .setNodes(true)
-                .setBlocks(true)
-                .setMasterNodeTimeout(TimeValue.timeValueMillis(100));
-            final ClusterStateResponse clusterStateResponse;
-            try {
-                clusterStateResponse = clusterStateRequestBuilder.get();
-            } catch (MasterNotDiscoveredException e) {
-                return; // ok, we hit the disconnected node
-            }
-            assertNotNull("should always contain a master node", clusterStateResponse.getState().nodes().getMasterNodeId());
-        });
-    }
-
     public void testLocalRequestAlwaysSucceeds() throws Exception {
         runRepeatedlyWhileChangingMaster(() -> {
             final String node = randomFrom(internalCluster().getNodeNames());
             final DiscoveryNodes discoveryNodes = client(node).admin()
                 .cluster()
-                .prepareState(TEST_REQUEST_TIMEOUT)
+                .prepareState(TimeValue.timeValueMillis(100))
                 .clear()
-                .setLocal(true)
                 .setNodes(true)
                 .setBlocks(true)
-                .setMasterNodeTimeout(TimeValue.timeValueMillis(100))
                 .get()
                 .getState()
                 .nodes();
@@ -87,39 +66,6 @@ public class TransportClusterStateActionDisruptionIT extends ESIntegTestCase {
         });
     }
 
-    public void testNonLocalRequestAlwaysFindsMasterAndWaitsForMetadata() throws Exception {
-        runRepeatedlyWhileChangingMaster(() -> {
-            final String node = randomFrom(internalCluster().getNodeNames());
-            final long metadataVersion = internalCluster().getInstance(ClusterService.class, node)
-                .getClusterApplierService()
-                .state()
-                .metadata()
-                .version();
-            final long waitForMetadataVersion = randomLongBetween(Math.max(1, metadataVersion - 3), metadataVersion + 5);
-            final ClusterStateRequestBuilder clusterStateRequestBuilder = client(node).admin()
-                .cluster()
-                .prepareState(TEST_REQUEST_TIMEOUT)
-                .clear()
-                .setNodes(true)
-                .setMetadata(true)
-                .setBlocks(true)
-                .setMasterNodeTimeout(TimeValue.timeValueMillis(100))
-                .setWaitForTimeOut(TimeValue.timeValueMillis(100))
-                .setWaitForMetadataVersion(waitForMetadataVersion);
-            final ClusterStateResponse clusterStateResponse;
-            try {
-                clusterStateResponse = clusterStateRequestBuilder.get();
-            } catch (MasterNotDiscoveredException e) {
-                return; // ok, we hit the disconnected node
-            }
-            if (clusterStateResponse.isWaitForTimedOut() == false) {
-                final ClusterState state = clusterStateResponse.getState();
-                assertNotNull("should always contain a master node", state.nodes().getMasterNodeId());
-                assertThat("waited for metadata version", state.metadata().version(), greaterThanOrEqualTo(waitForMetadataVersion));
-            }
-        });
-    }
-
     public void testLocalRequestWaitsForMetadata() throws Exception {
         runRepeatedlyWhileChangingMaster(() -> {
             final String node = randomFrom(internalCluster().getNodeNames());
@@ -131,13 +77,11 @@ public class TransportClusterStateActionDisruptionIT extends ESIntegTestCase {
             final long waitForMetadataVersion = randomLongBetween(Math.max(1, metadataVersion - 3), metadataVersion + 5);
             final ClusterStateResponse clusterStateResponse = client(node).admin()
                 .cluster()
-                .prepareState(TEST_REQUEST_TIMEOUT)
+                .prepareState(TimeValue.timeValueMillis(100))
                 .clear()
-                .setLocal(true)
                 .setMetadata(true)
                 .setBlocks(true)
                 .setWaitForMetadataVersion(waitForMetadataVersion)
-                .setMasterNodeTimeout(TimeValue.timeValueMillis(100))
                 .setWaitForTimeOut(TimeValue.timeValueMillis(100))
                 .get();
             if (clusterStateResponse.isWaitForTimedOut() == false) {

+ 1 - 3
server/src/internalClusterTest/java/org/elasticsearch/action/support/ActiveShardsObserverIT.java

@@ -135,9 +135,7 @@ public class ActiveShardsObserverIT extends ESIntegTestCase {
             .execute();
 
         logger.info("--> wait until the cluster state contains the new index");
-        assertBusy(
-            () -> assertTrue(clusterAdmin().prepareState(TEST_REQUEST_TIMEOUT).get().getState().metadata().getProject().hasIndex(indexName))
-        );
+        awaitClusterState(state -> state.metadata().getProject().hasIndex(indexName));
 
         logger.info("--> delete the index");
         assertAcked(indicesAdmin().prepareDelete(indexName));

+ 13 - 31
server/src/internalClusterTest/java/org/elasticsearch/cluster/MinimumMasterNodesIT.java

@@ -14,7 +14,6 @@ import org.elasticsearch.action.admin.cluster.configuration.ClearVotingConfigExc
 import org.elasticsearch.action.admin.cluster.configuration.TransportAddVotingConfigExclusionsAction;
 import org.elasticsearch.action.admin.cluster.configuration.TransportClearVotingConfigExclusionsAction;
 import org.elasticsearch.action.admin.cluster.health.ClusterHealthResponse;
-import org.elasticsearch.client.internal.Client;
 import org.elasticsearch.cluster.coordination.FailedToCommitClusterStateException;
 import org.elasticsearch.cluster.coordination.NoMasterBlockService;
 import org.elasticsearch.cluster.metadata.Metadata;
@@ -68,7 +67,7 @@ public class MinimumMasterNodesIT extends ESIntegTestCase {
         String node1Name = internalCluster().startNode(settings);
 
         logger.info("--> should be blocked, no master...");
-        ClusterState state = clusterAdmin().prepareState(TEST_REQUEST_TIMEOUT).setLocal(true).get().getState();
+        ClusterState state = clusterAdmin().prepareState(TEST_REQUEST_TIMEOUT).get().getState();
         assertThat(state.blocks().hasGlobalBlockWithId(NoMasterBlockService.NO_MASTER_BLOCK_ID), equalTo(true));
         assertThat(state.nodes().getSize(), equalTo(1)); // verify that we still see the local node in the cluster state
 
@@ -81,9 +80,7 @@ public class MinimumMasterNodesIT extends ESIntegTestCase {
             .get();
         assertThat(clusterHealthResponse.isTimedOut(), equalTo(false));
 
-        state = clusterAdmin().prepareState(TEST_REQUEST_TIMEOUT).setLocal(true).get().getState();
-        assertThat(state.blocks().hasGlobalBlockWithId(NoMasterBlockService.NO_MASTER_BLOCK_ID), equalTo(false));
-        state = clusterAdmin().prepareState(TEST_REQUEST_TIMEOUT).setLocal(true).get().getState();
+        state = clusterAdmin().prepareState(TEST_REQUEST_TIMEOUT).get().getState();
         assertThat(state.blocks().hasGlobalBlockWithId(NoMasterBlockService.NO_MASTER_BLOCK_ID), equalTo(false));
 
         state = clusterAdmin().prepareState(TEST_REQUEST_TIMEOUT).get().getState();
@@ -123,12 +120,9 @@ public class MinimumMasterNodesIT extends ESIntegTestCase {
         Settings masterDataPathSettings = internalCluster().dataPathSettings(masterNode);
         internalCluster().stopNode(masterNode);
 
-        assertBusy(() -> {
-            ClusterState clusterState = clusterAdmin().prepareState(TEST_REQUEST_TIMEOUT).setLocal(true).get().getState();
-            assertTrue(clusterState.blocks().hasGlobalBlockWithId(NoMasterBlockService.NO_MASTER_BLOCK_ID));
-        });
+        awaitClusterState(otherNode, clusterState -> clusterState.blocks().hasGlobalBlockWithId(NoMasterBlockService.NO_MASTER_BLOCK_ID));
 
-        state = clusterAdmin().prepareState(TEST_REQUEST_TIMEOUT).setLocal(true).get().getState();
+        state = clusterAdmin().prepareState(TEST_REQUEST_TIMEOUT).get().getState();
         assertThat(state.blocks().hasGlobalBlockWithId(NoMasterBlockService.NO_MASTER_BLOCK_ID), equalTo(true));
         // verify that both nodes are still in the cluster state but there is no master
         assertThat(state.nodes().getSize(), equalTo(2));
@@ -144,9 +138,7 @@ public class MinimumMasterNodesIT extends ESIntegTestCase {
             .get();
         assertThat(clusterHealthResponse.isTimedOut(), equalTo(false));
 
-        state = clusterAdmin().prepareState(TEST_REQUEST_TIMEOUT).setLocal(true).get().getState();
-        assertThat(state.blocks().hasGlobalBlockWithId(NoMasterBlockService.NO_MASTER_BLOCK_ID), equalTo(false));
-        state = clusterAdmin().prepareState(TEST_REQUEST_TIMEOUT).setLocal(true).get().getState();
+        state = clusterAdmin().prepareState(TEST_REQUEST_TIMEOUT).get().getState();
         assertThat(state.blocks().hasGlobalBlockWithId(NoMasterBlockService.NO_MASTER_BLOCK_ID), equalTo(false));
 
         state = clusterAdmin().prepareState(TEST_REQUEST_TIMEOUT).get().getState();
@@ -176,10 +168,7 @@ public class MinimumMasterNodesIT extends ESIntegTestCase {
         Settings otherNodeDataPathSettings = internalCluster().dataPathSettings(otherNode);
         internalCluster().stopNode(otherNode);
 
-        assertBusy(() -> {
-            ClusterState state1 = clusterAdmin().prepareState(TEST_REQUEST_TIMEOUT).setLocal(true).get().getState();
-            assertThat(state1.blocks().hasGlobalBlockWithId(NoMasterBlockService.NO_MASTER_BLOCK_ID), equalTo(true));
-        });
+        awaitClusterState(masterNode, clusterState -> clusterState.blocks().hasGlobalBlockWithId(NoMasterBlockService.NO_MASTER_BLOCK_ID));
 
         logger.info("--> starting the previous master node again...");
         internalCluster().startNode(Settings.builder().put(settings).put(otherNodeDataPathSettings).build());
@@ -192,9 +181,7 @@ public class MinimumMasterNodesIT extends ESIntegTestCase {
             .get();
         assertThat(clusterHealthResponse.isTimedOut(), equalTo(false));
 
-        state = clusterAdmin().prepareState(TEST_REQUEST_TIMEOUT).setLocal(true).get().getState();
-        assertThat(state.blocks().hasGlobalBlockWithId(NoMasterBlockService.NO_MASTER_BLOCK_ID), equalTo(false));
-        state = clusterAdmin().prepareState(TEST_REQUEST_TIMEOUT).setLocal(true).get().getState();
+        state = clusterAdmin().prepareState(TEST_REQUEST_TIMEOUT).get().getState();
         assertThat(state.blocks().hasGlobalBlockWithId(NoMasterBlockService.NO_MASTER_BLOCK_ID), equalTo(false));
 
         state = clusterAdmin().prepareState(TEST_REQUEST_TIMEOUT).get().getState();
@@ -220,12 +207,9 @@ public class MinimumMasterNodesIT extends ESIntegTestCase {
 
         ClusterState state;
 
-        assertBusy(() -> {
-            for (Client client : clients()) {
-                ClusterState state1 = client.admin().cluster().prepareState(TEST_REQUEST_TIMEOUT).setLocal(true).get().getState();
-                assertThat(state1.blocks().hasGlobalBlockWithId(NoMasterBlockService.NO_MASTER_BLOCK_ID), equalTo(true));
-            }
-        });
+        for (var node : internalCluster().getNodeNames()) {
+            awaitClusterState(node, clusterState -> clusterState.blocks().hasGlobalBlockWithId(NoMasterBlockService.NO_MASTER_BLOCK_ID));
+        }
 
         logger.info("--> start one more node");
         internalCluster().startNode(settings);
@@ -261,8 +245,9 @@ public class MinimumMasterNodesIT extends ESIntegTestCase {
             assertHitCount(prepareSearch().setSize(0).setQuery(QueryBuilders.matchAllQuery()), 100);
         }
 
+        final var masterName = internalCluster().getMasterName();
         List<String> nonMasterNodes = new ArrayList<>(
-            Sets.difference(Sets.newHashSet(internalCluster().getNodeNames()), Collections.singleton(internalCluster().getMasterName()))
+            Sets.difference(Sets.newHashSet(internalCluster().getNodeNames()), Collections.singleton(masterName))
         );
         Settings nonMasterDataPathSettings1 = internalCluster().dataPathSettings(nonMasterNodes.get(0));
         Settings nonMasterDataPathSettings2 = internalCluster().dataPathSettings(nonMasterNodes.get(1));
@@ -271,10 +256,7 @@ public class MinimumMasterNodesIT extends ESIntegTestCase {
 
         logger.info("--> verify that there is no master anymore on remaining node");
         // spin here to wait till the state is set
-        assertBusy(() -> {
-            ClusterState st = clusterAdmin().prepareState(TEST_REQUEST_TIMEOUT).setLocal(true).get().getState();
-            assertThat(st.blocks().hasGlobalBlockWithId(NoMasterBlockService.NO_MASTER_BLOCK_ID), equalTo(true));
-        });
+        awaitClusterState(masterName, clusterState -> clusterState.blocks().hasGlobalBlockWithId(NoMasterBlockService.NO_MASTER_BLOCK_ID));
 
         logger.info("--> start back the 2 nodes ");
         internalCluster().startNodes(nonMasterDataPathSettings1, nonMasterDataPathSettings2);

+ 10 - 27
server/src/internalClusterTest/java/org/elasticsearch/cluster/NoMasterNodeIT.java

@@ -82,17 +82,10 @@ public class NoMasterNodeIT extends ESIntegTestCase {
         internalCluster().setDisruptionScheme(disruptionScheme);
         disruptionScheme.startDisrupting();
 
-        final Client clientToMasterlessNode = client();
-
-        assertBusy(() -> {
-            ClusterState state = clientToMasterlessNode.admin()
-                .cluster()
-                .prepareState(TEST_REQUEST_TIMEOUT)
-                .setLocal(true)
-                .get()
-                .getState();
-            assertTrue(state.blocks().hasGlobalBlockWithId(NoMasterBlockService.NO_MASTER_BLOCK_ID));
-        });
+        final String masterlessNode = internalCluster().getRandomNodeName();
+        final Client clientToMasterlessNode = client(masterlessNode);
+
+        awaitClusterState(masterlessNode, state -> state.blocks().hasGlobalBlockWithId(NoMasterBlockService.NO_MASTER_BLOCK_ID));
 
         assertRequestBuilderThrows(
             clientToMasterlessNode.prepareGet("test", "1"),
@@ -246,17 +239,10 @@ public class NoMasterNodeIT extends ESIntegTestCase {
         internalCluster().setDisruptionScheme(disruptionScheme);
         disruptionScheme.startDisrupting();
 
-        final Client clientToMasterlessNode = client();
+        final String masterlessNode = internalCluster().getRandomNodeName();
+        final Client clientToMasterlessNode = client(masterlessNode);
 
-        assertBusy(() -> {
-            ClusterState state = clientToMasterlessNode.admin()
-                .cluster()
-                .prepareState(TEST_REQUEST_TIMEOUT)
-                .setLocal(true)
-                .get()
-                .getState();
-            assertTrue(state.blocks().hasGlobalBlockWithId(NoMasterBlockService.NO_MASTER_BLOCK_ID));
-        });
+        awaitClusterState(masterlessNode, state -> state.blocks().hasGlobalBlockWithId(NoMasterBlockService.NO_MASTER_BLOCK_ID));
 
         GetResponse getResponse = clientToMasterlessNode.prepareGet("test1", "1").get();
         assertExists(getResponse);
@@ -346,12 +332,9 @@ public class NoMasterNodeIT extends ESIntegTestCase {
         internalCluster().setDisruptionScheme(disruptionScheme);
         disruptionScheme.startDisrupting();
 
-        assertBusy(() -> {
-            for (String node : nodesWithShards) {
-                ClusterState state = client(node).admin().cluster().prepareState(TEST_REQUEST_TIMEOUT).setLocal(true).get().getState();
-                assertTrue(state.blocks().hasGlobalBlockWithId(NoMasterBlockService.NO_MASTER_BLOCK_ID));
-            }
-        });
+        for (String node : nodesWithShards) {
+            awaitClusterState(node, state -> state.blocks().hasGlobalBlockWithId(NoMasterBlockService.NO_MASTER_BLOCK_ID));
+        }
 
         GetResponse getResponse = client(randomFrom(nodesWithShards)).prepareGet("test1", "1").get();
         assertExists(getResponse);

+ 1 - 7
server/src/internalClusterTest/java/org/elasticsearch/cluster/coordination/InitialClusterStateIT.java

@@ -34,13 +34,7 @@ public class InitialClusterStateIT extends ESIntegTestCase {
 
     private static void assertClusterUuid(boolean expectCommitted, String expectedValue) {
         for (String nodeName : internalCluster().getNodeNames()) {
-            final Metadata metadata = client(nodeName).admin()
-                .cluster()
-                .prepareState(TEST_REQUEST_TIMEOUT)
-                .setLocal(true)
-                .get()
-                .getState()
-                .metadata();
+            final Metadata metadata = client(nodeName).admin().cluster().prepareState(TEST_REQUEST_TIMEOUT).get().getState().metadata();
             assertEquals(expectCommitted, metadata.clusterUUIDCommitted());
             assertEquals(expectedValue, metadata.clusterUUID());
 

+ 8 - 32
server/src/internalClusterTest/java/org/elasticsearch/cluster/coordination/UnsafeBootstrapAndDetachCommandIT.java

@@ -137,10 +137,7 @@ public class UnsafeBootstrapAndDetachCommandIT extends ESIntegTestCase {
                 .put(Node.INITIAL_STATE_TIMEOUT_SETTING.getKey(), "0s") // to ensure quick node startup
                 .build()
         );
-        assertBusy(() -> {
-            ClusterState state = clusterAdmin().prepareState(TEST_REQUEST_TIMEOUT).setLocal(true).get().getState();
-            assertTrue(state.blocks().hasGlobalBlockWithId(NoMasterBlockService.NO_MASTER_BLOCK_ID));
-        });
+        awaitClusterState(node, state -> state.blocks().hasGlobalBlockWithId(NoMasterBlockService.NO_MASTER_BLOCK_ID));
 
         Settings dataPathSettings = internalCluster().dataPathSettings(node);
 
@@ -242,16 +239,7 @@ public class UnsafeBootstrapAndDetachCommandIT extends ESIntegTestCase {
         internalCluster().stopNode(masterNodes.get(2));
 
         logger.info("--> ensure NO_MASTER_BLOCK on data-only node");
-        assertBusy(() -> {
-            ClusterState state = internalCluster().client(dataNode)
-                .admin()
-                .cluster()
-                .prepareState(TEST_REQUEST_TIMEOUT)
-                .setLocal(true)
-                .get()
-                .getState();
-            assertTrue(state.blocks().hasGlobalBlockWithId(NoMasterBlockService.NO_MASTER_BLOCK_ID));
-        });
+        awaitClusterState(dataNode, state -> state.blocks().hasGlobalBlockWithId(NoMasterBlockService.NO_MASTER_BLOCK_ID));
 
         logger.info("--> try to unsafely bootstrap 1st master-eligible node, while node lock is held");
         Environment environmentMaster1 = TestEnvironment.newEnvironment(
@@ -294,17 +282,11 @@ public class UnsafeBootstrapAndDetachCommandIT extends ESIntegTestCase {
         String dataNode2 = internalCluster().startDataOnlyNode(dataNodeDataPathSettings);
 
         logger.info("--> ensure there is no NO_MASTER_BLOCK and unsafe-bootstrap is reflected in cluster state");
-        assertBusy(() -> {
-            ClusterState state = internalCluster().client(dataNode2)
-                .admin()
-                .cluster()
-                .prepareState(TEST_REQUEST_TIMEOUT)
-                .setLocal(true)
-                .get()
-                .getState();
-            assertFalse(state.blocks().hasGlobalBlockWithId(NoMasterBlockService.NO_MASTER_BLOCK_ID));
-            assertTrue(state.metadata().persistentSettings().getAsBoolean(UnsafeBootstrapMasterCommand.UNSAFE_BOOTSTRAP.getKey(), false));
-        });
+        awaitClusterState(
+            dataNode2,
+            state -> state.blocks().hasGlobalBlockWithId(NoMasterBlockService.NO_MASTER_BLOCK_ID) == false
+                && state.metadata().persistentSettings().getAsBoolean(UnsafeBootstrapMasterCommand.UNSAFE_BOOTSTRAP.getKey(), false)
+        );
 
         logger.info("--> ensure index test is green");
         ensureGreen("test");
@@ -346,13 +328,7 @@ public class UnsafeBootstrapAndDetachCommandIT extends ESIntegTestCase {
                 .build()
         );
 
-        ClusterState state = internalCluster().client()
-            .admin()
-            .cluster()
-            .prepareState(TEST_REQUEST_TIMEOUT)
-            .setLocal(true)
-            .get()
-            .getState();
+        ClusterState state = internalCluster().client().admin().cluster().prepareState(TEST_REQUEST_TIMEOUT).get().getState();
         assertTrue(state.blocks().hasGlobalBlockWithId(NoMasterBlockService.NO_MASTER_BLOCK_ID));
 
         internalCluster().stopNode(node);

+ 1 - 1
server/src/internalClusterTest/java/org/elasticsearch/cluster/routing/allocation/ShardStateIT.java

@@ -66,7 +66,7 @@ public class ShardStateIT extends ESIntegTestCase {
     protected void assertPrimaryTerms(long shard0Term, long shard1Term) {
         for (String node : internalCluster().getNodeNames()) {
             logger.debug("--> asserting primary terms terms on [{}]", node);
-            ClusterState state = client(node).admin().cluster().prepareState(TEST_REQUEST_TIMEOUT).setLocal(true).get().getState();
+            ClusterState state = client(node).admin().cluster().prepareState(TEST_REQUEST_TIMEOUT).get().getState();
             IndexMetadata metadata = state.metadata().getProject().index("test");
             assertThat(metadata.primaryTerm(0), equalTo(shard0Term));
             assertThat(metadata.primaryTerm(1), equalTo(shard1Term));

+ 2 - 17
server/src/internalClusterTest/java/org/elasticsearch/discovery/StableMasterDisruptionIT.java

@@ -9,7 +9,6 @@
 
 package org.elasticsearch.discovery;
 
-import org.elasticsearch.action.admin.cluster.state.ClusterStateRequest;
 import org.elasticsearch.client.internal.Client;
 import org.elasticsearch.cluster.ClusterChangedEvent;
 import org.elasticsearch.cluster.ClusterName;
@@ -103,7 +102,7 @@ public class StableMasterDisruptionIT extends ESIntegTestCase {
         // The unlucky node must report *no* master node, since it can't connect to master and in fact it should
         // continuously ping until network failures have been resolved. However
         // It may a take a bit before the node detects it has been cut off from the elected master
-        ensureNoMaster(unluckyNode);
+        awaitMasterNotFound(unluckyNode);
         // because it has had a master within the last 30s:
         assertGreenMasterStability(internalCluster().client(unluckyNode));
 
@@ -144,20 +143,6 @@ public class StableMasterDisruptionIT extends ESIntegTestCase {
         return BytesReference.bytes(builder).utf8ToString();
     }
 
-    private void ensureNoMaster(String node) throws Exception {
-        assertBusy(
-            () -> assertNull(
-                client(node).admin()
-                    .cluster()
-                    .state(new ClusterStateRequest(TEST_REQUEST_TIMEOUT).local(true))
-                    .get()
-                    .getState()
-                    .nodes()
-                    .getMasterNode()
-            )
-        );
-    }
-
     /**
      * Verify that nodes fault detection detects a disconnected node after master reelection
      */
@@ -209,7 +194,7 @@ public class StableMasterDisruptionIT extends ESIntegTestCase {
 
         logger.info("--> waiting for master to remove it");
         ensureStableCluster(2, master);
-        ensureNoMaster(isolatedNode);
+        awaitMasterNotFound(isolatedNode);
 
         networkDisruption.stopDisrupting();
         ensureStableCluster(3);

+ 6 - 49
server/src/internalClusterTest/java/org/elasticsearch/gateway/RecoverAfterNodesIT.java

@@ -37,7 +37,6 @@ public class RecoverAfterNodesIT extends ESIntegTestCase {
             blocks = nodeClient.admin()
                 .cluster()
                 .prepareState(TEST_REQUEST_TIMEOUT)
-                .setLocal(true)
                 .get()
                 .getState()
                 .blocks()
@@ -56,75 +55,33 @@ public class RecoverAfterNodesIT extends ESIntegTestCase {
         logger.info("--> start master_node (1)");
         Client master1 = startNode(Settings.builder().put(RECOVER_AFTER_DATA_NODES_SETTING.getKey(), 2).put(masterOnlyNode()));
         assertThat(
-            master1.admin()
-                .cluster()
-                .prepareState(TEST_REQUEST_TIMEOUT)
-                .setLocal(true)
-                .get()
-                .getState()
-                .blocks()
-                .global(ClusterBlockLevel.METADATA_WRITE),
+            master1.admin().cluster().prepareState(TEST_REQUEST_TIMEOUT).get().getState().blocks().global(ClusterBlockLevel.METADATA_WRITE),
             hasItem(GatewayService.STATE_NOT_RECOVERED_BLOCK)
         );
 
         logger.info("--> start data_node (1)");
         Client data1 = startNode(Settings.builder().put(RECOVER_AFTER_DATA_NODES_SETTING.getKey(), 2).put(dataOnlyNode()));
         assertThat(
-            master1.admin()
-                .cluster()
-                .prepareState(TEST_REQUEST_TIMEOUT)
-                .setLocal(true)
-                .get()
-                .getState()
-                .blocks()
-                .global(ClusterBlockLevel.METADATA_WRITE),
+            master1.admin().cluster().prepareState(TEST_REQUEST_TIMEOUT).get().getState().blocks().global(ClusterBlockLevel.METADATA_WRITE),
             hasItem(GatewayService.STATE_NOT_RECOVERED_BLOCK)
         );
         assertThat(
-            data1.admin()
-                .cluster()
-                .prepareState(TEST_REQUEST_TIMEOUT)
-                .setLocal(true)
-                .get()
-                .getState()
-                .blocks()
-                .global(ClusterBlockLevel.METADATA_WRITE),
+            data1.admin().cluster().prepareState(TEST_REQUEST_TIMEOUT).get().getState().blocks().global(ClusterBlockLevel.METADATA_WRITE),
             hasItem(GatewayService.STATE_NOT_RECOVERED_BLOCK)
         );
 
         logger.info("--> start master_node (2)");
         Client master2 = startNode(Settings.builder().put(RECOVER_AFTER_DATA_NODES_SETTING.getKey(), 2).put(masterOnlyNode()));
         assertThat(
-            master2.admin()
-                .cluster()
-                .prepareState(TEST_REQUEST_TIMEOUT)
-                .setLocal(true)
-                .get()
-                .getState()
-                .blocks()
-                .global(ClusterBlockLevel.METADATA_WRITE),
+            master2.admin().cluster().prepareState(TEST_REQUEST_TIMEOUT).get().getState().blocks().global(ClusterBlockLevel.METADATA_WRITE),
             hasItem(GatewayService.STATE_NOT_RECOVERED_BLOCK)
         );
         assertThat(
-            data1.admin()
-                .cluster()
-                .prepareState(TEST_REQUEST_TIMEOUT)
-                .setLocal(true)
-                .get()
-                .getState()
-                .blocks()
-                .global(ClusterBlockLevel.METADATA_WRITE),
+            data1.admin().cluster().prepareState(TEST_REQUEST_TIMEOUT).get().getState().blocks().global(ClusterBlockLevel.METADATA_WRITE),
             hasItem(GatewayService.STATE_NOT_RECOVERED_BLOCK)
         );
         assertThat(
-            master2.admin()
-                .cluster()
-                .prepareState(TEST_REQUEST_TIMEOUT)
-                .setLocal(true)
-                .get()
-                .getState()
-                .blocks()
-                .global(ClusterBlockLevel.METADATA_WRITE),
+            master2.admin().cluster().prepareState(TEST_REQUEST_TIMEOUT).get().getState().blocks().global(ClusterBlockLevel.METADATA_WRITE),
             hasItem(GatewayService.STATE_NOT_RECOVERED_BLOCK)
         );
 

+ 8 - 13
server/src/internalClusterTest/java/org/elasticsearch/recovery/RelocationIT.java

@@ -22,7 +22,6 @@ import org.elasticsearch.action.admin.indices.stats.ShardStats;
 import org.elasticsearch.action.index.IndexRequestBuilder;
 import org.elasticsearch.action.support.WriteRequest;
 import org.elasticsearch.client.internal.Client;
-import org.elasticsearch.cluster.ClusterState;
 import org.elasticsearch.cluster.metadata.IndexMetadata;
 import org.elasticsearch.cluster.node.DiscoveryNode;
 import org.elasticsearch.cluster.routing.ShardRoutingState;
@@ -414,19 +413,15 @@ public class RelocationIT extends ESIntegTestCase {
         updateClusterSettings(Settings.builder().put(EnableAllocationDecider.CLUSTER_ROUTING_ALLOCATION_ENABLE_SETTING.getKey(), "none"));
 
         logger.info("--> wait for all replica shards to be removed, on all nodes");
-        assertBusy(() -> {
-            for (String node : internalCluster().getNodeNames()) {
-                if (node.equals(p_node)) {
-                    continue;
-                }
-                ClusterState state = client(node).admin().cluster().prepareState(TEST_REQUEST_TIMEOUT).setLocal(true).get().getState();
-                assertThat(
-                    node + " indicates assigned replicas",
-                    state.getRoutingTable().index(indexName).shardsWithState(ShardRoutingState.UNASSIGNED).size(),
-                    equalTo(1)
-                );
+        for (String node : internalCluster().getNodeNames()) {
+            if (node.equals(p_node)) {
+                continue;
             }
-        });
+            awaitClusterState(
+                node,
+                state -> state.getRoutingTable().index(indexName).shardsWithState(ShardRoutingState.UNASSIGNED).size() == 1
+            );
+        }
 
         logger.info("--> verifying no temporary recoveries are left");
         for (String node : internalCluster().getNodeNames()) {

+ 2 - 4
server/src/internalClusterTest/java/org/elasticsearch/search/basic/SearchRedStateIndexIT.java

@@ -13,7 +13,6 @@ import org.elasticsearch.action.NoShardAvailableActionException;
 import org.elasticsearch.action.admin.cluster.settings.ClusterUpdateSettingsResponse;
 import org.elasticsearch.action.search.SearchPhaseExecutionException;
 import org.elasticsearch.action.search.ShardSearchFailure;
-import org.elasticsearch.cluster.ClusterState;
 import org.elasticsearch.cluster.health.ClusterHealthStatus;
 import org.elasticsearch.cluster.routing.RoutingNodesHelper;
 import org.elasticsearch.cluster.routing.ShardRouting;
@@ -120,10 +119,9 @@ public class SearchRedStateIndexIT extends ESIntegTestCase {
 
         clusterAdmin().prepareHealth(TEST_REQUEST_TIMEOUT).setWaitForStatus(ClusterHealthStatus.RED).get();
 
-        assertBusy(() -> {
-            ClusterState state = clusterAdmin().prepareState(TEST_REQUEST_TIMEOUT).get().getState();
+        awaitClusterState(state -> {
             List<ShardRouting> unassigneds = RoutingNodesHelper.shardsWithState(state.getRoutingNodes(), ShardRoutingState.UNASSIGNED);
-            assertThat(unassigneds.size(), greaterThan(0));
+            return unassigneds.isEmpty() == false;
         });
 
     }

+ 1 - 5
server/src/internalClusterTest/java/org/elasticsearch/snapshots/DedicatedClusterSnapshotRestoreIT.java

@@ -62,7 +62,6 @@ import org.elasticsearch.test.disruption.BusyMasterServiceDisruption;
 import org.elasticsearch.test.disruption.ServiceDisruptionScheme;
 import org.elasticsearch.test.rest.FakeRestRequest;
 import org.elasticsearch.test.transport.MockTransportService;
-import org.elasticsearch.threadpool.ThreadPool;
 import org.elasticsearch.transport.TransportMessageListener;
 import org.elasticsearch.transport.TransportRequest;
 import org.elasticsearch.transport.TransportRequestOptions;
@@ -524,10 +523,7 @@ public class DedicatedClusterSnapshotRestoreIT extends AbstractSnapshotIntegTest
             throw getRepoError.get();
         }
 
-        RestClusterStateAction clusterStateAction = new RestClusterStateAction(
-            internalCluster().getInstance(SettingsFilter.class),
-            internalCluster().getInstance(ThreadPool.class)
-        );
+        RestClusterStateAction clusterStateAction = new RestClusterStateAction(internalCluster().getInstance(SettingsFilter.class));
         RestRequest clusterStateRequest = new FakeRestRequest();
         final CountDownLatch clusterStateLatch = new CountDownLatch(1);
         final AtomicReference<AssertionError> clusterStateError = new AtomicReference<>();

+ 1 - 4
server/src/main/java/org/elasticsearch/action/ActionModule.java

@@ -455,7 +455,6 @@ public class ActionModule extends AbstractModule {
     private final Set<RestHeaderDefinition> headersToCopy;
     private final RequestValidators<PutMappingRequest> mappingRequestValidators;
     private final RequestValidators<IndicesAliasesRequest> indicesAliasesRequestRequestValidators;
-    private final ThreadPool threadPool;
     private final ReservedClusterStateService reservedClusterStateService;
     private final RestExtension restExtension;
 
@@ -488,7 +487,6 @@ public class ActionModule extends AbstractModule {
         this.clusterSettings = clusterSettings;
         this.settingsFilter = settingsFilter;
         this.actionPlugins = actionPlugins;
-        this.threadPool = threadPool;
         actions = setupActions(actionPlugins);
         actionFilters = setupActionFilters(actionPlugins);
         this.bulkService = bulkService;
@@ -855,7 +853,7 @@ public class ActionModule extends AbstractModule {
         registerHandler.accept(new RestGetDesiredBalanceAction());
         registerHandler.accept(new RestDeleteDesiredBalanceAction());
         registerHandler.accept(new RestClusterStatsAction());
-        registerHandler.accept(new RestClusterStateAction(settingsFilter, threadPool));
+        registerHandler.accept(new RestClusterStateAction(settingsFilter));
         registerHandler.accept(new RestClusterHealthAction());
         registerHandler.accept(new RestClusterUpdateSettingsAction());
         registerHandler.accept(new RestClusterGetSettingsAction(settings, clusterSettings, settingsFilter));
@@ -1061,7 +1059,6 @@ public class ActionModule extends AbstractModule {
             bind(action.getTransportAction()).asEagerSingleton();
             transportActionsBinder.addBinding(action.getAction()).to(action.getTransportAction()).asEagerSingleton();
         }
-
     }
 
     public ActionFilters getActionFilters() {

+ 8 - 36
server/src/main/java/org/elasticsearch/action/admin/cluster/state/ClusterStateRequest.java

@@ -12,20 +12,21 @@ package org.elasticsearch.action.admin.cluster.state;
 import org.elasticsearch.action.ActionRequestValidationException;
 import org.elasticsearch.action.IndicesRequest;
 import org.elasticsearch.action.support.IndicesOptions;
-import org.elasticsearch.action.support.master.MasterNodeReadRequest;
+import org.elasticsearch.action.support.local.LocalClusterStateRequest;
 import org.elasticsearch.common.Strings;
-import org.elasticsearch.common.io.stream.StreamInput;
-import org.elasticsearch.common.io.stream.StreamOutput;
 import org.elasticsearch.core.TimeValue;
 import org.elasticsearch.tasks.CancellableTask;
 import org.elasticsearch.tasks.Task;
 import org.elasticsearch.tasks.TaskId;
 
-import java.io.IOException;
 import java.util.Arrays;
 import java.util.Map;
 
-public class ClusterStateRequest extends MasterNodeReadRequest<ClusterStateRequest> implements IndicesRequest.Replaceable {
+/**
+ * A local-only request for obtaining (parts of) the cluster state. {@link RemoteClusterStateRequest} can be used for obtaining cluster
+ * states from remote clusters.
+ */
+public class ClusterStateRequest extends LocalClusterStateRequest implements IndicesRequest.Replaceable {
 
     public static final TimeValue DEFAULT_WAIT_FOR_NODE_TIMEOUT = TimeValue.timeValueMinutes(1);
 
@@ -43,33 +44,6 @@ public class ClusterStateRequest extends MasterNodeReadRequest<ClusterStateReque
         super(masterNodeTimeout);
     }
 
-    public ClusterStateRequest(StreamInput in) throws IOException {
-        super(in);
-        routingTable = in.readBoolean();
-        nodes = in.readBoolean();
-        metadata = in.readBoolean();
-        blocks = in.readBoolean();
-        customs = in.readBoolean();
-        indices = in.readStringArray();
-        indicesOptions = IndicesOptions.readIndicesOptions(in);
-        waitForTimeout = in.readTimeValue();
-        waitForMetadataVersion = in.readOptionalLong();
-    }
-
-    @Override
-    public void writeTo(StreamOutput out) throws IOException {
-        super.writeTo(out);
-        out.writeBoolean(routingTable);
-        out.writeBoolean(nodes);
-        out.writeBoolean(metadata);
-        out.writeBoolean(blocks);
-        out.writeBoolean(customs);
-        out.writeStringArray(indices);
-        indicesOptions.writeIndicesOptions(out);
-        out.writeTimeValue(waitForTimeout);
-        out.writeOptionalLong(waitForMetadataVersion);
-    }
-
     @Override
     public ActionRequestValidationException validate() {
         return null;
@@ -212,9 +186,7 @@ public class ClusterStateRequest extends MasterNodeReadRequest<ClusterStateReque
         if (customs) {
             stringBuilder.append("customs, ");
         }
-        if (local) {
-            stringBuilder.append("local, ");
-        }
+        stringBuilder.append("local, ");
         if (waitForMetadataVersion != null) {
             stringBuilder.append("wait for metadata version [")
                 .append(waitForMetadataVersion)
@@ -225,7 +197,7 @@ public class ClusterStateRequest extends MasterNodeReadRequest<ClusterStateReque
         if (indices.length > 0) {
             stringBuilder.append("indices ").append(Arrays.toString(indices)).append(", ");
         }
-        stringBuilder.append("master timeout [").append(masterNodeTimeout()).append("]]");
+        stringBuilder.append("master timeout [").append(masterTimeout()).append("]]");
         return stringBuilder.toString();
     }
 

+ 2 - 5
server/src/main/java/org/elasticsearch/action/admin/cluster/state/ClusterStateRequestBuilder.java

@@ -9,16 +9,13 @@
 
 package org.elasticsearch.action.admin.cluster.state;
 
+import org.elasticsearch.action.ActionRequestBuilder;
 import org.elasticsearch.action.support.IndicesOptions;
-import org.elasticsearch.action.support.master.MasterNodeReadOperationRequestBuilder;
 import org.elasticsearch.client.internal.ElasticsearchClient;
 import org.elasticsearch.cluster.metadata.Metadata;
 import org.elasticsearch.core.TimeValue;
 
-public class ClusterStateRequestBuilder extends MasterNodeReadOperationRequestBuilder<
-    ClusterStateRequest,
-    ClusterStateResponse,
-    ClusterStateRequestBuilder> {
+public class ClusterStateRequestBuilder extends ActionRequestBuilder<ClusterStateRequest, ClusterStateResponse> {
 
     public ClusterStateRequestBuilder(ElasticsearchClient client, TimeValue masterNodeTimeout) {
         super(client, ClusterStateAction.INSTANCE, new ClusterStateRequest(masterNodeTimeout));

+ 232 - 0
server/src/main/java/org/elasticsearch/action/admin/cluster/state/RemoteClusterStateRequest.java

@@ -0,0 +1,232 @@
+/*
+ * Copyright Elasticsearch B.V. and/or licensed to Elasticsearch B.V. under one
+ * or more contributor license agreements. Licensed under the "Elastic License
+ * 2.0", the "GNU Affero General Public License v3.0 only", and the "Server Side
+ * Public License v 1"; you may not use this file except in compliance with, at
+ * your election, the "Elastic License 2.0", the "GNU Affero General Public
+ * License v3.0 only", or the "Server Side Public License, v 1".
+ */
+
+package org.elasticsearch.action.admin.cluster.state;
+
+import org.elasticsearch.action.ActionRequestValidationException;
+import org.elasticsearch.action.IndicesRequest;
+import org.elasticsearch.action.support.IndicesOptions;
+import org.elasticsearch.action.support.master.MasterNodeReadRequest;
+import org.elasticsearch.common.Strings;
+import org.elasticsearch.common.io.stream.StreamInput;
+import org.elasticsearch.common.io.stream.StreamOutput;
+import org.elasticsearch.core.TimeValue;
+import org.elasticsearch.tasks.CancellableTask;
+import org.elasticsearch.tasks.Task;
+import org.elasticsearch.tasks.TaskId;
+
+import java.io.IOException;
+import java.util.Arrays;
+import java.util.Map;
+
+/**
+ * A remote-only version of {@link ClusterStateRequest} that should be used for cross-cluster requests.
+ * It simply exists to handle incoming remote requests and forward them to the local transport action.
+ */
+public class RemoteClusterStateRequest extends MasterNodeReadRequest<RemoteClusterStateRequest> implements IndicesRequest.Replaceable {
+
+    private boolean routingTable = true;
+    private boolean nodes = true;
+    private boolean metadata = true;
+    private boolean blocks = true;
+    private boolean customs = true;
+    private Long waitForMetadataVersion;
+    private TimeValue waitForTimeout = ClusterStateRequest.DEFAULT_WAIT_FOR_NODE_TIMEOUT;
+    private String[] indices = Strings.EMPTY_ARRAY;
+    private IndicesOptions indicesOptions = IndicesOptions.lenientExpandOpen();
+
+    public RemoteClusterStateRequest(TimeValue masterNodeTimeout) {
+        super(masterNodeTimeout);
+    }
+
+    public RemoteClusterStateRequest(StreamInput in) throws IOException {
+        super(in);
+        routingTable = in.readBoolean();
+        nodes = in.readBoolean();
+        metadata = in.readBoolean();
+        blocks = in.readBoolean();
+        customs = in.readBoolean();
+        indices = in.readStringArray();
+        indicesOptions = IndicesOptions.readIndicesOptions(in);
+        waitForTimeout = in.readTimeValue();
+        waitForMetadataVersion = in.readOptionalLong();
+    }
+
+    @Override
+    public ActionRequestValidationException validate() {
+        // We defer validation to `ClusterStateRequest`, which will run on the local node.
+        return null;
+    }
+
+    public RemoteClusterStateRequest all() {
+        routingTable = true;
+        nodes = true;
+        metadata = true;
+        blocks = true;
+        customs = true;
+        indices = Strings.EMPTY_ARRAY;
+        return this;
+    }
+
+    public RemoteClusterStateRequest clear() {
+        routingTable = false;
+        nodes = false;
+        metadata = false;
+        blocks = false;
+        customs = false;
+        indices = Strings.EMPTY_ARRAY;
+        return this;
+    }
+
+    public boolean routingTable() {
+        return routingTable;
+    }
+
+    public RemoteClusterStateRequest routingTable(boolean routingTable) {
+        this.routingTable = routingTable;
+        return this;
+    }
+
+    public boolean nodes() {
+        return nodes;
+    }
+
+    public RemoteClusterStateRequest nodes(boolean nodes) {
+        this.nodes = nodes;
+        return this;
+    }
+
+    public boolean metadata() {
+        return metadata;
+    }
+
+    public RemoteClusterStateRequest metadata(boolean metadata) {
+        this.metadata = metadata;
+        return this;
+    }
+
+    public boolean blocks() {
+        return blocks;
+    }
+
+    public RemoteClusterStateRequest blocks(boolean blocks) {
+        this.blocks = blocks;
+        return this;
+    }
+
+    @Override
+    public String[] indices() {
+        return indices;
+    }
+
+    @Override
+    public RemoteClusterStateRequest indices(String... indices) {
+        this.indices = indices;
+        return this;
+    }
+
+    @Override
+    public IndicesOptions indicesOptions() {
+        return this.indicesOptions;
+    }
+
+    public final RemoteClusterStateRequest indicesOptions(IndicesOptions indicesOptions) {
+        this.indicesOptions = indicesOptions;
+        return this;
+    }
+
+    @Override
+    public boolean includeDataStreams() {
+        return true;
+    }
+
+    public RemoteClusterStateRequest customs(boolean customs) {
+        this.customs = customs;
+        return this;
+    }
+
+    public boolean customs() {
+        return customs;
+    }
+
+    public TimeValue waitForTimeout() {
+        return waitForTimeout;
+    }
+
+    public RemoteClusterStateRequest waitForTimeout(TimeValue waitForTimeout) {
+        this.waitForTimeout = waitForTimeout;
+        return this;
+    }
+
+    public Long waitForMetadataVersion() {
+        return waitForMetadataVersion;
+    }
+
+    public RemoteClusterStateRequest waitForMetadataVersion(long waitForMetadataVersion) {
+        if (waitForMetadataVersion < 1) {
+            throw new IllegalArgumentException(
+                "provided waitForMetadataVersion should be >= 1, but instead is [" + waitForMetadataVersion + "]"
+            );
+        }
+        this.waitForMetadataVersion = waitForMetadataVersion;
+        return this;
+    }
+
+    @Override
+    public void writeTo(StreamOutput out) throws IOException {
+        super.writeTo(out);
+        out.writeBoolean(routingTable);
+        out.writeBoolean(nodes);
+        out.writeBoolean(metadata);
+        out.writeBoolean(blocks);
+        out.writeBoolean(customs);
+        out.writeStringArray(indices);
+        indicesOptions.writeIndicesOptions(out);
+        out.writeTimeValue(waitForTimeout);
+        out.writeOptionalLong(waitForMetadataVersion);
+    }
+
+    @Override
+    public Task createTask(long id, String type, String action, TaskId parentTaskId, Map<String, String> headers) {
+        return new CancellableTask(id, type, action, getDescription(), parentTaskId, headers);
+    }
+
+    @Override
+    public String getDescription() {
+        final StringBuilder stringBuilder = new StringBuilder("remote cluster state [");
+        if (routingTable) {
+            stringBuilder.append("routing table, ");
+        }
+        if (nodes) {
+            stringBuilder.append("nodes, ");
+        }
+        if (metadata) {
+            stringBuilder.append("metadata, ");
+        }
+        if (blocks) {
+            stringBuilder.append("blocks, ");
+        }
+        if (customs) {
+            stringBuilder.append("customs, ");
+        }
+        if (waitForMetadataVersion != null) {
+            stringBuilder.append("wait for metadata version [")
+                .append(waitForMetadataVersion)
+                .append("] with timeout [")
+                .append(waitForTimeout)
+                .append("], ");
+        }
+        if (indices.length > 0) {
+            stringBuilder.append("indices ").append(Arrays.toString(indices)).append(", ");
+        }
+        stringBuilder.append("master timeout [").append(masterNodeTimeout()).append("]]");
+        return stringBuilder.toString();
+
+    }
+}

+ 15 - 16
server/src/main/java/org/elasticsearch/action/admin/cluster/state/TransportClusterStateAction.java

@@ -14,7 +14,8 @@ import org.apache.logging.log4j.Logger;
 import org.elasticsearch.action.ActionListener;
 import org.elasticsearch.action.ActionRunnable;
 import org.elasticsearch.action.support.ActionFilters;
-import org.elasticsearch.action.support.master.TransportMasterNodeReadAction;
+import org.elasticsearch.action.support.local.TransportLocalClusterStateAction;
+import org.elasticsearch.client.internal.Client;
 import org.elasticsearch.cluster.ClusterState;
 import org.elasticsearch.cluster.ClusterStateObserver;
 import org.elasticsearch.cluster.NotMasterException;
@@ -50,12 +51,13 @@ import java.util.Set;
 import java.util.function.BiPredicate;
 import java.util.function.Predicate;
 
-public class TransportClusterStateAction extends TransportMasterNodeReadAction<ClusterStateRequest, ClusterStateResponse> {
+public class TransportClusterStateAction extends TransportLocalClusterStateAction<ClusterStateRequest, ClusterStateResponse> {
 
     private static final Logger logger = LogManager.getLogger(TransportClusterStateAction.class);
 
     private final ProjectResolver projectResolver;
     private final IndexNameExpressionResolver indexNameExpressionResolver;
+    private final ThreadPool threadPool;
 
     @Inject
     public TransportClusterStateAction(
@@ -64,21 +66,22 @@ public class TransportClusterStateAction extends TransportMasterNodeReadAction<C
         ThreadPool threadPool,
         ActionFilters actionFilters,
         IndexNameExpressionResolver indexNameExpressionResolver,
-        ProjectResolver projectResolver
+        ProjectResolver projectResolver,
+        Client client
     ) {
         super(
             ClusterStateAction.NAME,
-            false,
-            transportService,
-            clusterService,
-            threadPool,
             actionFilters,
-            ClusterStateRequest::new,
-            ClusterStateResponse::new,
+            transportService.getTaskManager(),
+            clusterService,
             threadPool.executor(ThreadPool.Names.MANAGEMENT)
         );
         this.projectResolver = projectResolver;
         this.indexNameExpressionResolver = indexNameExpressionResolver;
+        this.threadPool = threadPool;
+
+        // construct to register with TransportService
+        new TransportRemoteClusterStateAction(transportService, threadPool, actionFilters, client);
     }
 
     @Override
@@ -91,7 +94,7 @@ public class TransportClusterStateAction extends TransportMasterNodeReadAction<C
     }
 
     @Override
-    protected void masterOperation(
+    protected void localClusterStateOperation(
         Task task,
         final ClusterStateRequest request,
         final ClusterState state,
@@ -105,17 +108,13 @@ public class TransportClusterStateAction extends TransportMasterNodeReadAction<C
             ? Predicates.always()
             : clusterState -> clusterState.metadata().version() >= request.waitForMetadataVersion();
 
-        final Predicate<ClusterState> acceptableClusterStateOrFailedPredicate = request.local()
-            ? acceptableClusterStatePredicate
-            : acceptableClusterStatePredicate.or(clusterState -> clusterState.nodes().isLocalNodeElectedMaster() == false);
-
         if (cancellableTask.notifyIfCancelled(listener)) {
             return;
         }
         if (acceptableClusterStatePredicate.test(state)) {
             ActionListener.completeWith(listener, () -> buildResponse(request, state));
         } else {
-            assert acceptableClusterStateOrFailedPredicate.test(state) == false;
+            assert acceptableClusterStatePredicate.test(state) == false;
             new ClusterStateObserver(state, clusterService, request.waitForTimeout(), logger, threadPool.getThreadContext())
                 .waitForNextChange(new ClusterStateObserver.Listener() {
 
@@ -149,7 +148,7 @@ public class TransportClusterStateAction extends TransportMasterNodeReadAction<C
                             }
                         });
                     }
-                }, clusterState -> cancellableTask.isCancelled() || acceptableClusterStateOrFailedPredicate.test(clusterState));
+                }, clusterState -> cancellableTask.isCancelled() || acceptableClusterStatePredicate.test(clusterState));
         }
     }
 

+ 62 - 0
server/src/main/java/org/elasticsearch/action/admin/cluster/state/TransportRemoteClusterStateAction.java

@@ -0,0 +1,62 @@
+/*
+ * Copyright Elasticsearch B.V. and/or licensed to Elasticsearch B.V. under one
+ * or more contributor license agreements. Licensed under the "Elastic License
+ * 2.0", the "GNU Affero General Public License v3.0 only", and the "Server Side
+ * Public License v 1"; you may not use this file except in compliance with, at
+ * your election, the "Elastic License 2.0", the "GNU Affero General Public
+ * License v3.0 only", or the "Server Side Public License, v 1".
+ */
+
+package org.elasticsearch.action.admin.cluster.state;
+
+import org.elasticsearch.action.ActionListener;
+import org.elasticsearch.action.support.ActionFilters;
+import org.elasticsearch.action.support.HandledTransportAction;
+import org.elasticsearch.client.internal.Client;
+import org.elasticsearch.injection.guice.Inject;
+import org.elasticsearch.tasks.Task;
+import org.elasticsearch.threadpool.ThreadPool;
+import org.elasticsearch.transport.TransportService;
+
+/**
+ * A remote-only version of {@link TransportClusterStateAction} that should be used for cross-cluster requests.
+ * It simply exists to handle incoming remote requests and forward them to the local transport action.
+ */
+public class TransportRemoteClusterStateAction extends HandledTransportAction<RemoteClusterStateRequest, ClusterStateResponse> {
+
+    private final Client client;
+
+    @Inject
+    public TransportRemoteClusterStateAction(
+        TransportService transportService,
+        ThreadPool threadPool,
+        ActionFilters actionFilters,
+        Client client
+    ) {
+        super(
+            ClusterStateAction.NAME,
+            transportService,
+            actionFilters,
+            RemoteClusterStateRequest::new,
+            threadPool.executor(ThreadPool.Names.MANAGEMENT)
+        );
+        this.client = client;
+    }
+
+    @Override
+    protected void doExecute(Task task, RemoteClusterStateRequest request, ActionListener<ClusterStateResponse> listener) {
+        final ClusterStateRequest localRequest = new ClusterStateRequest(request.masterNodeTimeout());
+        localRequest.routingTable(request.routingTable());
+        localRequest.nodes(request.nodes());
+        localRequest.metadata(request.metadata());
+        localRequest.blocks(request.blocks());
+        localRequest.customs(request.customs());
+        if (request.waitForMetadataVersion() != null) {
+            localRequest.waitForMetadataVersion(request.waitForMetadataVersion());
+        }
+        localRequest.waitForTimeout(request.waitForTimeout());
+        localRequest.indices(request.indices());
+        localRequest.indicesOptions(request.indicesOptions());
+        client.execute(ClusterStateAction.INSTANCE, localRequest, listener);
+    }
+}

+ 5 - 28
server/src/main/java/org/elasticsearch/rest/action/admin/cluster/RestClusterStateAction.java

@@ -9,7 +9,6 @@
 
 package org.elasticsearch.rest.action.admin.cluster;
 
-import org.elasticsearch.ElasticsearchTimeoutException;
 import org.elasticsearch.action.admin.cluster.state.ClusterStateAction;
 import org.elasticsearch.action.admin.cluster.state.ClusterStateRequest;
 import org.elasticsearch.action.admin.cluster.state.ClusterStateResponse;
@@ -26,11 +25,11 @@ import org.elasticsearch.common.xcontent.ChunkedToXContentHelper;
 import org.elasticsearch.core.FixForMultiProject;
 import org.elasticsearch.rest.BaseRestHandler;
 import org.elasticsearch.rest.RestRequest;
+import org.elasticsearch.rest.RestUtils;
 import org.elasticsearch.rest.Scope;
 import org.elasticsearch.rest.ServerlessScope;
 import org.elasticsearch.rest.action.RestCancellableNodeClient;
 import org.elasticsearch.rest.action.RestChunkedToXContentListener;
-import org.elasticsearch.threadpool.ThreadPool;
 import org.elasticsearch.xcontent.ToXContent;
 
 import java.io.IOException;
@@ -40,7 +39,6 @@ import java.util.Iterator;
 import java.util.List;
 import java.util.Map;
 import java.util.Set;
-import java.util.function.LongSupplier;
 
 import static org.elasticsearch.common.util.set.Sets.addToCopy;
 import static org.elasticsearch.rest.RestRequest.Method.GET;
@@ -53,11 +51,8 @@ public class RestClusterStateAction extends BaseRestHandler {
 
     private final SettingsFilter settingsFilter;
 
-    private final ThreadPool threadPool;
-
-    public RestClusterStateAction(SettingsFilter settingsFilter, ThreadPool threadPool) {
+    public RestClusterStateAction(SettingsFilter settingsFilter) {
         this.settingsFilter = settingsFilter;
-        this.threadPool = threadPool;
     }
 
     @Override
@@ -83,7 +78,7 @@ public class RestClusterStateAction extends BaseRestHandler {
     public RestChannelConsumer prepareRequest(final RestRequest request, final NodeClient client) throws IOException {
         final ClusterStateRequest clusterStateRequest = new ClusterStateRequest(getMasterNodeTimeout(request));
         clusterStateRequest.indicesOptions(IndicesOptions.fromRequest(request, clusterStateRequest.indicesOptions()));
-        clusterStateRequest.local(request.paramAsBoolean("local", clusterStateRequest.local()));
+        RestUtils.consumeDeprecatedLocalParameter(request);
         if (request.hasParam("wait_for_metadata_version")) {
             clusterStateRequest.waitForMetadataVersion(request.paramAsLong("wait_for_metadata_version", 0));
         }
@@ -125,7 +120,7 @@ public class RestClusterStateAction extends BaseRestHandler {
             ClusterStateAction.INSTANCE,
             clusterStateRequest,
             new RestChunkedToXContentListener<RestClusterStateResponse>(channel, new ToXContent.DelegatingMapParams(params, request)).map(
-                response -> new RestClusterStateResponse(clusterStateRequest, response, threadPool.relativeTimeInMillisSupplier())
+                response -> new RestClusterStateResponse(clusterStateRequest, response)
             )
         );
     }
@@ -145,28 +140,10 @@ public class RestClusterStateAction extends BaseRestHandler {
         static final String CLUSTER_NAME = "cluster_name";
     }
 
-    private static class RestClusterStateResponse implements ChunkedToXContent {
-
-        private final ClusterStateRequest request;
-        private final ClusterStateResponse response;
-        private final LongSupplier currentTimeMillisSupplier;
-        private final long startTimeMillis;
-
-        RestClusterStateResponse(ClusterStateRequest request, ClusterStateResponse response, LongSupplier currentTimeMillisSupplier) {
-            this.request = request;
-            this.response = response;
-            this.currentTimeMillisSupplier = currentTimeMillisSupplier;
-            this.startTimeMillis = currentTimeMillisSupplier.getAsLong();
-        }
+    private record RestClusterStateResponse(ClusterStateRequest request, ClusterStateResponse response) implements ChunkedToXContent {
 
         @Override
         public Iterator<? extends ToXContent> toXContentChunked(ToXContent.Params outerParams) {
-            if (request.local() == false
-                && request.masterNodeTimeout().millis() >= 0
-                && currentTimeMillisSupplier.getAsLong() - startTimeMillis > request.masterNodeTimeout().millis()) {
-                throw new ElasticsearchTimeoutException("Timed out getting cluster state");
-            }
-
             final ClusterState responseState = response.getState();
 
             return Iterators.concat(Iterators.single((builder, params) -> {

+ 2 - 1
server/src/main/java/org/elasticsearch/rest/action/cat/RestAllocationAction.java

@@ -25,6 +25,7 @@ import org.elasticsearch.common.Table;
 import org.elasticsearch.common.unit.ByteSizeValue;
 import org.elasticsearch.rest.RestRequest;
 import org.elasticsearch.rest.RestResponse;
+import org.elasticsearch.rest.RestUtils;
 import org.elasticsearch.rest.Scope;
 import org.elasticsearch.rest.ServerlessScope;
 import org.elasticsearch.rest.action.RestActionListener;
@@ -62,7 +63,7 @@ public class RestAllocationAction extends AbstractCatAction {
         final String[] nodes = Strings.splitStringByCommaToArray(request.param("nodes", "data:true"));
         final ClusterStateRequest clusterStateRequest = new ClusterStateRequest(getMasterNodeTimeout(request));
         clusterStateRequest.clear().routingTable(true);
-        clusterStateRequest.local(request.paramAsBoolean("local", clusterStateRequest.local()));
+        RestUtils.consumeDeprecatedLocalParameter(request);
 
         return channel -> client.admin().cluster().state(clusterStateRequest, new RestActionListener<ClusterStateResponse>(channel) {
             @Override

+ 2 - 1
server/src/main/java/org/elasticsearch/rest/action/cat/RestCatComponentTemplateAction.java

@@ -23,6 +23,7 @@ import org.elasticsearch.common.regex.Regex;
 import org.elasticsearch.rest.BaseRestHandler;
 import org.elasticsearch.rest.RestRequest;
 import org.elasticsearch.rest.RestResponse;
+import org.elasticsearch.rest.RestUtils;
 import org.elasticsearch.rest.Scope;
 import org.elasticsearch.rest.ServerlessScope;
 import org.elasticsearch.rest.action.RestResponseListener;
@@ -86,7 +87,7 @@ public class RestCatComponentTemplateAction extends AbstractCatAction {
         final String matchPattern = request.hasParam("name") ? request.param("name") : null;
         final ClusterStateRequest clusterStateRequest = new ClusterStateRequest(getMasterNodeTimeout(request));
         clusterStateRequest.clear().metadata(true);
-        clusterStateRequest.local(request.paramAsBoolean("local", clusterStateRequest.local()));
+        RestUtils.consumeDeprecatedLocalParameter(request);
         return channel -> client.admin().cluster().state(clusterStateRequest, new RestResponseListener<>(channel) {
             @Override
             public RestResponse buildResponse(ClusterStateResponse clusterStateResponse) throws Exception {

+ 2 - 1
server/src/main/java/org/elasticsearch/rest/action/cat/RestMasterAction.java

@@ -17,6 +17,7 @@ import org.elasticsearch.cluster.node.DiscoveryNodes;
 import org.elasticsearch.common.Table;
 import org.elasticsearch.rest.RestRequest;
 import org.elasticsearch.rest.RestResponse;
+import org.elasticsearch.rest.RestUtils;
 import org.elasticsearch.rest.Scope;
 import org.elasticsearch.rest.ServerlessScope;
 import org.elasticsearch.rest.action.RestResponseListener;
@@ -48,7 +49,7 @@ public class RestMasterAction extends AbstractCatAction {
     public RestChannelConsumer doCatRequest(final RestRequest request, final NodeClient client) {
         final ClusterStateRequest clusterStateRequest = new ClusterStateRequest(getMasterNodeTimeout(request));
         clusterStateRequest.clear().nodes(true);
-        clusterStateRequest.local(request.paramAsBoolean("local", clusterStateRequest.local()));
+        RestUtils.consumeDeprecatedLocalParameter(request);
 
         return channel -> client.admin().cluster().state(clusterStateRequest, new RestResponseListener<ClusterStateResponse>(channel) {
             @Override

+ 2 - 1
server/src/main/java/org/elasticsearch/rest/action/cat/RestNodeAttrsAction.java

@@ -23,6 +23,7 @@ import org.elasticsearch.common.Table;
 import org.elasticsearch.monitor.process.ProcessInfo;
 import org.elasticsearch.rest.RestRequest;
 import org.elasticsearch.rest.RestResponse;
+import org.elasticsearch.rest.RestUtils;
 import org.elasticsearch.rest.Scope;
 import org.elasticsearch.rest.ServerlessScope;
 import org.elasticsearch.rest.action.RestActionListener;
@@ -56,7 +57,7 @@ public class RestNodeAttrsAction extends AbstractCatAction {
     public RestChannelConsumer doCatRequest(final RestRequest request, final NodeClient client) {
         final ClusterStateRequest clusterStateRequest = new ClusterStateRequest(getMasterNodeTimeout(request));
         clusterStateRequest.clear().nodes(true);
-        clusterStateRequest.local(request.paramAsBoolean("local", clusterStateRequest.local()));
+        RestUtils.consumeDeprecatedLocalParameter(request);
 
         return channel -> client.admin().cluster().state(clusterStateRequest, new RestActionListener<ClusterStateResponse>(channel) {
             @Override

+ 2 - 1
server/src/main/java/org/elasticsearch/rest/action/cat/RestPluginsAction.java

@@ -24,6 +24,7 @@ import org.elasticsearch.plugins.PluginDescriptor;
 import org.elasticsearch.plugins.PluginRuntimeInfo;
 import org.elasticsearch.rest.RestRequest;
 import org.elasticsearch.rest.RestResponse;
+import org.elasticsearch.rest.RestUtils;
 import org.elasticsearch.rest.Scope;
 import org.elasticsearch.rest.ServerlessScope;
 import org.elasticsearch.rest.action.RestActionListener;
@@ -57,7 +58,7 @@ public class RestPluginsAction extends AbstractCatAction {
         final boolean includeBootstrapPlugins = request.paramAsBoolean("include_bootstrap", false);
         final ClusterStateRequest clusterStateRequest = new ClusterStateRequest(getMasterNodeTimeout(request));
         clusterStateRequest.clear().nodes(true);
-        clusterStateRequest.local(request.paramAsBoolean("local", clusterStateRequest.local()));
+        RestUtils.consumeDeprecatedLocalParameter(request);
 
         return channel -> client.admin().cluster().state(clusterStateRequest, new RestActionListener<ClusterStateResponse>(channel) {
             @Override

+ 2 - 1
server/src/main/java/org/elasticsearch/rest/action/cat/RestSegmentsAction.java

@@ -23,6 +23,7 @@ import org.elasticsearch.common.Table;
 import org.elasticsearch.index.engine.Segment;
 import org.elasticsearch.rest.RestRequest;
 import org.elasticsearch.rest.RestResponse;
+import org.elasticsearch.rest.RestUtils;
 import org.elasticsearch.rest.Scope;
 import org.elasticsearch.rest.ServerlessScope;
 import org.elasticsearch.rest.action.RestActionListener;
@@ -59,7 +60,7 @@ public class RestSegmentsAction extends AbstractCatAction {
         final String[] indices = Strings.splitStringByCommaToArray(request.param("index"));
 
         final ClusterStateRequest clusterStateRequest = new ClusterStateRequest(getMasterNodeTimeout(request));
-        clusterStateRequest.local(request.paramAsBoolean("local", clusterStateRequest.local()));
+        RestUtils.consumeDeprecatedLocalParameter(request);
         clusterStateRequest.clear().nodes(true).routingTable(true).indices(indices);
 
         final RestCancellableNodeClient cancelClient = new RestCancellableNodeClient(client, request.getHttpChannel());

+ 2 - 1
server/src/main/java/org/elasticsearch/rest/action/cat/RestThreadPoolAction.java

@@ -28,6 +28,7 @@ import org.elasticsearch.core.TimeValue;
 import org.elasticsearch.monitor.process.ProcessInfo;
 import org.elasticsearch.rest.RestRequest;
 import org.elasticsearch.rest.RestResponse;
+import org.elasticsearch.rest.RestUtils;
 import org.elasticsearch.rest.Scope;
 import org.elasticsearch.rest.ServerlessScope;
 import org.elasticsearch.rest.action.RestActionListener;
@@ -73,7 +74,7 @@ public class RestThreadPoolAction extends AbstractCatAction {
     public RestChannelConsumer doCatRequest(final RestRequest request, final NodeClient client) {
         final ClusterStateRequest clusterStateRequest = new ClusterStateRequest(getMasterNodeTimeout(request));
         clusterStateRequest.clear().nodes(true);
-        clusterStateRequest.local(request.paramAsBoolean("local", clusterStateRequest.local()));
+        RestUtils.consumeDeprecatedLocalParameter(request);
 
         return channel -> client.admin().cluster().state(clusterStateRequest, new RestActionListener<ClusterStateResponse>(channel) {
             @Override

+ 2 - 3
server/src/main/java/org/elasticsearch/transport/RemoteClusterConnection.java

@@ -12,8 +12,8 @@ import org.elasticsearch.action.ActionListener;
 import org.elasticsearch.action.ActionListenerResponseHandler;
 import org.elasticsearch.action.admin.cluster.remote.RemoteClusterNodesAction;
 import org.elasticsearch.action.admin.cluster.state.ClusterStateAction;
-import org.elasticsearch.action.admin.cluster.state.ClusterStateRequest;
 import org.elasticsearch.action.admin.cluster.state.ClusterStateResponse;
+import org.elasticsearch.action.admin.cluster.state.RemoteClusterStateRequest;
 import org.elasticsearch.action.support.ContextPreservingActionListener;
 import org.elasticsearch.cluster.node.DiscoveryNode;
 import org.elasticsearch.common.settings.Settings;
@@ -148,13 +148,12 @@ public final class RemoteClusterConnection implements Closeable {
                         }), RemoteClusterNodesAction.Response::new, TransportResponseHandler.TRANSPORT_WORKER)
                     );
                 } else {
-                    final ClusterStateRequest request = new ClusterStateRequest(
+                    final RemoteClusterStateRequest request = new RemoteClusterStateRequest(
                         /* Timeout doesn't really matter with .local(true) */
                         TimeValue.THIRTY_SECONDS
                     );
                     request.clear();
                     request.nodes(true);
-                    request.local(true); // run this on the node that gets the request it's as good as any other
 
                     transportService.sendRequest(
                         connection,

+ 2 - 2
server/src/main/java/org/elasticsearch/transport/SniffConnectionStrategy.java

@@ -14,8 +14,8 @@ import org.elasticsearch.Version;
 import org.elasticsearch.action.ActionListener;
 import org.elasticsearch.action.admin.cluster.remote.RemoteClusterNodesAction;
 import org.elasticsearch.action.admin.cluster.state.ClusterStateAction;
-import org.elasticsearch.action.admin.cluster.state.ClusterStateRequest;
 import org.elasticsearch.action.admin.cluster.state.ClusterStateResponse;
+import org.elasticsearch.action.admin.cluster.state.RemoteClusterStateRequest;
 import org.elasticsearch.cluster.ClusterName;
 import org.elasticsearch.cluster.node.DiscoveryNode;
 import org.elasticsearch.cluster.node.DiscoveryNodeRole;
@@ -321,7 +321,7 @@ public class SniffConnectionStrategy extends RemoteConnectionStrategy {
                     sniffResponseHandler = new RemoteClusterNodesSniffResponseHandler(connection, listener, seedNodesSuppliers);
                 } else {
                     action = ClusterStateAction.NAME;
-                    final ClusterStateRequest clusterStateRequest = new ClusterStateRequest(SNIFF_REQUEST_TIMEOUT);
+                    final RemoteClusterStateRequest clusterStateRequest = new RemoteClusterStateRequest(SNIFF_REQUEST_TIMEOUT);
                     clusterStateRequest.clear();
                     clusterStateRequest.nodes(true);
                     request = clusterStateRequest;

+ 10 - 6
server/src/test/java/org/elasticsearch/action/admin/cluster/state/ClusterStateRequestTests.java

@@ -30,7 +30,9 @@ public class ClusterStateRequestTests extends ESTestCase {
         for (int i = 0; i < iterations; i++) {
 
             IndicesOptions indicesOptions = IndicesOptions.fromOptions(randomBoolean(), randomBoolean(), randomBoolean(), randomBoolean());
-            ClusterStateRequest clusterStateRequest = new ClusterStateRequest(TEST_REQUEST_TIMEOUT).routingTable(randomBoolean())
+            RemoteClusterStateRequest clusterStateRequest = new RemoteClusterStateRequest(TEST_REQUEST_TIMEOUT).routingTable(
+                randomBoolean()
+            )
                 .metadata(randomBoolean())
                 .nodes(randomBoolean())
                 .blocks(randomBoolean())
@@ -55,7 +57,7 @@ public class ClusterStateRequestTests extends ESTestCase {
 
             StreamInput streamInput = output.bytes().streamInput();
             streamInput.setTransportVersion(testVersion);
-            ClusterStateRequest deserializedCSRequest = new ClusterStateRequest(streamInput);
+            RemoteClusterStateRequest deserializedCSRequest = new RemoteClusterStateRequest(streamInput);
 
             assertThat(deserializedCSRequest.routingTable(), equalTo(clusterStateRequest.routingTable()));
             assertThat(deserializedCSRequest.metadata(), equalTo(clusterStateRequest.metadata()));
@@ -85,10 +87,13 @@ public class ClusterStateRequestTests extends ESTestCase {
     }
 
     public void testDescription() {
-        assertThat(new ClusterStateRequest(TEST_REQUEST_TIMEOUT).clear().getDescription(), equalTo("cluster state [master timeout [30s]]"));
         assertThat(
-            new ClusterStateRequest(TEST_REQUEST_TIMEOUT).masterNodeTimeout(TimeValue.timeValueMinutes(5)).getDescription(),
-            equalTo("cluster state [routing table, nodes, metadata, blocks, customs, master timeout [5m]]")
+            new ClusterStateRequest(TEST_REQUEST_TIMEOUT).clear().getDescription(),
+            equalTo("cluster state [local, master timeout [30s]]")
+        );
+        assertThat(
+            new ClusterStateRequest(TimeValue.timeValueMinutes(5)).getDescription(),
+            equalTo("cluster state [routing table, nodes, metadata, blocks, customs, local, master timeout [5m]]")
         );
         assertThat(
             new ClusterStateRequest(TEST_REQUEST_TIMEOUT).clear().routingTable(true).getDescription(),
@@ -98,7 +103,6 @@ public class ClusterStateRequestTests extends ESTestCase {
         assertThat(new ClusterStateRequest(TEST_REQUEST_TIMEOUT).clear().metadata(true).getDescription(), containsString("metadata"));
         assertThat(new ClusterStateRequest(TEST_REQUEST_TIMEOUT).clear().blocks(true).getDescription(), containsString("blocks"));
         assertThat(new ClusterStateRequest(TEST_REQUEST_TIMEOUT).clear().customs(true).getDescription(), containsString("customs"));
-        assertThat(new ClusterStateRequest(TEST_REQUEST_TIMEOUT).local(true).getDescription(), containsString("local"));
         assertThat(
             new ClusterStateRequest(TEST_REQUEST_TIMEOUT).waitForMetadataVersion(23L).getDescription(),
             containsString("wait for metadata version [23] with timeout [1m]")

+ 4 - 2
server/src/test/java/org/elasticsearch/action/admin/cluster/state/TransportClusterStateActionTests.java

@@ -33,6 +33,7 @@ import org.elasticsearch.indices.TestIndexNameExpressionResolver;
 import org.elasticsearch.tasks.CancellableTask;
 import org.elasticsearch.tasks.Task;
 import org.elasticsearch.test.ESTestCase;
+import org.elasticsearch.test.client.NoOpClient;
 import org.elasticsearch.test.transport.MockTransportService;
 import org.elasticsearch.threadpool.TestThreadPool;
 import org.elasticsearch.transport.TransportService;
@@ -203,10 +204,11 @@ public class TransportClusterStateActionTests extends ESTestCase {
             threadPool,
             new ActionFilters(Set.of()),
             indexResolver,
-            projectResolver
+            projectResolver,
+            new NoOpClient(threadPool)
         );
         final PlainActionFuture<ClusterStateResponse> future = new PlainActionFuture<>();
-        action.masterOperation(task, request, state, future);
+        action.localClusterStateOperation(task, request, state, future);
         return future.get();
     }
 

+ 3 - 3
server/src/test/java/org/elasticsearch/client/internal/ParentTaskAssigningClientTests.java

@@ -15,8 +15,8 @@ import org.elasticsearch.action.ActionResponse;
 import org.elasticsearch.action.ActionType;
 import org.elasticsearch.action.RemoteClusterActionType;
 import org.elasticsearch.action.admin.cluster.state.ClusterStateAction;
-import org.elasticsearch.action.admin.cluster.state.ClusterStateRequest;
 import org.elasticsearch.action.admin.cluster.state.ClusterStateResponse;
+import org.elasticsearch.action.admin.cluster.state.RemoteClusterStateRequest;
 import org.elasticsearch.action.bulk.BulkRequest;
 import org.elasticsearch.action.search.ClearScrollRequest;
 import org.elasticsearch.action.search.SearchRequest;
@@ -119,7 +119,7 @@ public class ParentTaskAssigningClientTests extends ESTestCase {
                     ClusterStateResponse.class,
                     listener -> remoteClusterClient.execute(
                         ClusterStateAction.REMOTE_TYPE,
-                        new ClusterStateRequest(TEST_REQUEST_TIMEOUT),
+                        new RemoteClusterStateRequest(TEST_REQUEST_TIMEOUT),
                         listener
                     )
                 ).getMessage()
@@ -133,7 +133,7 @@ public class ParentTaskAssigningClientTests extends ESTestCase {
                     listener -> remoteClusterClient.execute(
                         null,
                         ClusterStateAction.REMOTE_TYPE,
-                        new ClusterStateRequest(TEST_REQUEST_TIMEOUT),
+                        new RemoteClusterStateRequest(TEST_REQUEST_TIMEOUT),
                         listener
                     )
                 ).getMessage()

+ 1 - 1
server/src/test/java/org/elasticsearch/discovery/AbstractDisruptionTestCase.java

@@ -131,7 +131,7 @@ public abstract class AbstractDisruptionTestCase extends ESIntegTestCase {
     }
 
     ClusterState getNodeClusterState(String node) {
-        return client(node).admin().cluster().prepareState(TEST_REQUEST_TIMEOUT).setLocal(true).get().getState();
+        return client(node).admin().cluster().prepareState(TEST_REQUEST_TIMEOUT).get().getState();
     }
 
     void assertNoMaster(final String node) throws Exception {

+ 3 - 1
server/src/test/java/org/elasticsearch/snapshots/SnapshotResiliencyTests.java

@@ -191,6 +191,7 @@ import org.elasticsearch.telemetry.tracing.Tracer;
 import org.elasticsearch.test.ClusterServiceUtils;
 import org.elasticsearch.test.ESTestCase;
 import org.elasticsearch.test.MockLog;
+import org.elasticsearch.test.client.NoOpClient;
 import org.elasticsearch.test.junit.annotations.TestLogging;
 import org.elasticsearch.threadpool.ThreadPool;
 import org.elasticsearch.transport.BytesRefRecycler;
@@ -2843,7 +2844,8 @@ public class SnapshotResiliencyTests extends ESTestCase {
                         threadPool,
                         actionFilters,
                         indexNameExpressionResolver,
-                        DefaultProjectResolver.INSTANCE
+                        DefaultProjectResolver.INSTANCE,
+                        new NoOpClient(threadPool)
                     )
                 );
                 actions.put(

+ 5 - 5
server/src/test/java/org/elasticsearch/transport/RemoteClusterClientTests.java

@@ -11,8 +11,8 @@ package org.elasticsearch.transport;
 import org.elasticsearch.TransportVersion;
 import org.elasticsearch.action.ActionListener;
 import org.elasticsearch.action.admin.cluster.state.ClusterStateAction;
-import org.elasticsearch.action.admin.cluster.state.ClusterStateRequest;
 import org.elasticsearch.action.admin.cluster.state.ClusterStateResponse;
+import org.elasticsearch.action.admin.cluster.state.RemoteClusterStateRequest;
 import org.elasticsearch.action.search.SearchResponse;
 import org.elasticsearch.action.search.SearchScrollRequest;
 import org.elasticsearch.action.search.TransportSearchScrollAction;
@@ -108,7 +108,7 @@ public class RemoteClusterClientTests extends ESTestCase {
                             () -> assertTrue(Thread.currentThread().getName().contains('[' + TEST_THREAD_POOL_NAME + ']'))
                         ),
                         clusterStateResponseListener -> {
-                            final var request = new ClusterStateRequest(TEST_REQUEST_TIMEOUT);
+                            final var request = new RemoteClusterStateRequest(TEST_REQUEST_TIMEOUT);
                             if (randomBoolean()) {
                                 client.execute(ClusterStateAction.REMOTE_TYPE, request, clusterStateResponseListener);
                             } else {
@@ -197,7 +197,7 @@ public class RemoteClusterClientTests extends ESTestCase {
                         final ClusterStateResponse clusterStateResponse = safeAwait(
                             listener -> client.execute(
                                 ClusterStateAction.REMOTE_TYPE,
-                                new ClusterStateRequest(TEST_REQUEST_TIMEOUT),
+                                new RemoteClusterStateRequest(TEST_REQUEST_TIMEOUT),
                                 listener
                             )
                         );
@@ -295,7 +295,7 @@ public class RemoteClusterClientTests extends ESTestCase {
                                 ClusterStateResponse.class,
                                 listener -> client.execute(
                                     ClusterStateAction.REMOTE_TYPE,
-                                    new ClusterStateRequest(TEST_REQUEST_TIMEOUT),
+                                    new RemoteClusterStateRequest(TEST_REQUEST_TIMEOUT),
                                     listener
                                 )
                             ),
@@ -318,7 +318,7 @@ public class RemoteClusterClientTests extends ESTestCase {
                         () -> safeAwait(
                             listener -> client.execute(
                                 ClusterStateAction.REMOTE_TYPE,
-                                new ClusterStateRequest(TEST_REQUEST_TIMEOUT),
+                                new RemoteClusterStateRequest(TEST_REQUEST_TIMEOUT),
                                 listener.map(v -> v)
                             )
                         ),

+ 2 - 2
server/src/test/java/org/elasticsearch/transport/RemoteClusterConnectionTests.java

@@ -14,8 +14,8 @@ import org.elasticsearch.TransportVersion;
 import org.elasticsearch.action.ActionListener;
 import org.elasticsearch.action.admin.cluster.remote.RemoteClusterNodesAction;
 import org.elasticsearch.action.admin.cluster.state.ClusterStateAction;
-import org.elasticsearch.action.admin.cluster.state.ClusterStateRequest;
 import org.elasticsearch.action.admin.cluster.state.ClusterStateResponse;
+import org.elasticsearch.action.admin.cluster.state.RemoteClusterStateRequest;
 import org.elasticsearch.action.search.SearchRequest;
 import org.elasticsearch.action.search.SearchShardsRequest;
 import org.elasticsearch.action.search.SearchShardsResponse;
@@ -163,7 +163,7 @@ public class RemoteClusterConnectionTests extends ESTestCase {
             newService.registerRequestHandler(
                 ClusterStateAction.NAME,
                 EsExecutors.DIRECT_EXECUTOR_SERVICE,
-                ClusterStateRequest::new,
+                RemoteClusterStateRequest::new,
                 (request, channel, task) -> {
                     DiscoveryNodes.Builder builder = DiscoveryNodes.builder();
                     for (DiscoveryNode node : knownNodes) {

+ 2 - 2
server/src/test/java/org/elasticsearch/transport/SniffConnectionStrategyTests.java

@@ -14,8 +14,8 @@ import org.elasticsearch.TransportVersions;
 import org.elasticsearch.Version;
 import org.elasticsearch.action.admin.cluster.remote.RemoteClusterNodesAction;
 import org.elasticsearch.action.admin.cluster.state.ClusterStateAction;
-import org.elasticsearch.action.admin.cluster.state.ClusterStateRequest;
 import org.elasticsearch.action.admin.cluster.state.ClusterStateResponse;
+import org.elasticsearch.action.admin.cluster.state.RemoteClusterStateRequest;
 import org.elasticsearch.action.support.PlainActionFuture;
 import org.elasticsearch.cluster.ClusterName;
 import org.elasticsearch.cluster.ClusterState;
@@ -126,7 +126,7 @@ public class SniffConnectionStrategyTests extends ESTestCase {
             newService.registerRequestHandler(
                 ClusterStateAction.NAME,
                 EsExecutors.DIRECT_EXECUTOR_SERVICE,
-                ClusterStateRequest::new,
+                RemoteClusterStateRequest::new,
                 (request, channel, task) -> {
                     DiscoveryNodes.Builder builder = DiscoveryNodes.builder();
                     for (DiscoveryNode node : knownNodes) {

+ 3 - 17
test/framework/src/main/java/org/elasticsearch/indices/recovery/AbstractIndexRecoveryIntegTestCase.java

@@ -58,11 +58,9 @@ import java.util.function.BiConsumer;
 
 import static org.elasticsearch.test.hamcrest.ElasticsearchAssertions.assertAcked;
 import static org.elasticsearch.test.hamcrest.ElasticsearchAssertions.assertHitCount;
-import static org.hamcrest.Matchers.empty;
 import static org.hamcrest.Matchers.equalTo;
 import static org.hamcrest.Matchers.greaterThan;
 import static org.hamcrest.Matchers.is;
-import static org.hamcrest.Matchers.not;
 import static org.hamcrest.Matchers.notNullValue;
 
 public abstract class AbstractIndexRecoveryIntegTestCase extends ESIntegTestCase {
@@ -338,21 +336,9 @@ public abstract class AbstractIndexRecoveryIntegTestCase extends ESIntegTestCase
                 if (PeerRecoverySourceService.Actions.START_RECOVERY.equals(action) && count.incrementAndGet() == 1) {
                     // ensures that it's considered as valid recovery attempt by source
                     try {
-                        assertBusy(
-                            () -> assertThat(
-                                "Expected there to be some initializing shards",
-                                client(blueNodeName).admin()
-                                    .cluster()
-                                    .prepareState(TEST_REQUEST_TIMEOUT)
-                                    .setLocal(true)
-                                    .get()
-                                    .getState()
-                                    .getRoutingTable()
-                                    .index("test")
-                                    .shard(0)
-                                    .getAllInitializingShards(),
-                                not(empty())
-                            )
+                        awaitClusterState(
+                            blueNodeName,
+                            state -> state.getRoutingTable().index("test").shard(0).getAllInitializingShards().isEmpty() == false
                         );
                     } catch (Exception e) {
                         throw new RuntimeException(e);

+ 2 - 29
test/framework/src/main/java/org/elasticsearch/test/ClusterServiceUtils.java

@@ -12,13 +12,11 @@ import org.apache.logging.log4j.Logger;
 import org.apache.logging.log4j.core.util.Throwables;
 import org.elasticsearch.ElasticsearchException;
 import org.elasticsearch.action.ActionListener;
-import org.elasticsearch.action.support.PlainActionFuture;
 import org.elasticsearch.action.support.SubscribableListener;
 import org.elasticsearch.cluster.ClusterChangedEvent;
 import org.elasticsearch.cluster.ClusterName;
 import org.elasticsearch.cluster.ClusterState;
 import org.elasticsearch.cluster.ClusterStateListener;
-import org.elasticsearch.cluster.ClusterStateObserver;
 import org.elasticsearch.cluster.ClusterStatePublicationEvent;
 import org.elasticsearch.cluster.ClusterStateUpdateTask;
 import org.elasticsearch.cluster.NodeConnectionsService;
@@ -39,14 +37,12 @@ import org.elasticsearch.common.settings.ClusterSettings;
 import org.elasticsearch.common.settings.Settings;
 import org.elasticsearch.common.util.concurrent.EsExecutors;
 import org.elasticsearch.core.TimeValue;
-import org.elasticsearch.node.NodeClosedException;
 import org.elasticsearch.tasks.TaskManager;
 import org.elasticsearch.telemetry.tracing.Tracer;
 import org.elasticsearch.threadpool.ThreadPool;
 
 import java.util.Collections;
 import java.util.concurrent.CountDownLatch;
-import java.util.concurrent.TimeUnit;
 import java.util.concurrent.atomic.AtomicReference;
 import java.util.function.Predicate;
 
@@ -235,31 +231,8 @@ public class ClusterServiceUtils {
 
     public static void awaitClusterState(Logger logger, Predicate<ClusterState> statePredicate, ClusterService clusterService)
         throws Exception {
-        final PlainActionFuture<Void> future = new PlainActionFuture<>();
-        ClusterStateObserver.waitForState(
-            clusterService,
-            clusterService.getClusterApplierService().threadPool().getThreadContext(),
-            new ClusterStateObserver.Listener() {
-                @Override
-                public void onNewClusterState(ClusterState state) {
-                    future.onResponse(null);
-                }
-
-                @Override
-                public void onClusterServiceClose() {
-                    future.onFailure(new NodeClosedException(clusterService.localNode()));
-                }
-
-                @Override
-                public void onTimeout(TimeValue timeout) {
-                    assert false : "onTimeout called with no timeout set";
-                }
-            },
-            statePredicate,
-            null,
-            logger
-        );
-        future.get(30L, TimeUnit.SECONDS);
+        final var listener = addTemporaryStateListener(clusterService, statePredicate, ESTestCase.TEST_REQUEST_TIMEOUT);
+        ESTestCase.safeAwait(listener, ESTestCase.TEST_REQUEST_TIMEOUT);
     }
 
     public static void awaitNoPendingTasks(ClusterService clusterService) {

+ 13 - 4
test/framework/src/main/java/org/elasticsearch/test/ESIntegTestCase.java

@@ -979,7 +979,14 @@ public abstract class ESIntegTestCase extends ESTestCase {
      * Note that this does not guarantee that all other nodes in the cluster are on the same cluster state version already.
      */
     public void awaitMasterNotFound() {
-        var viaNode = internalCluster().getRandomNodeName();
+        awaitMasterNotFound(internalCluster().getRandomNodeName());
+    }
+
+    /**
+     * Waits for the given node to not see a master node in the cluster state.
+     * Note that this does not guarantee that all other nodes in the cluster are on the same cluster state version already.
+     */
+    public void awaitMasterNotFound(String viaNode) {
         // We use a temporary state listener instead of `awaitClusterState` here because the `ClusterStateObserver` doesn't run the
         // predicate if the cluster state version didn't change. When a master node leaves the cluster (i.e. what this method is used for),
         // the cluster state version is not incremented.
@@ -1212,6 +1219,10 @@ public abstract class ESIntegTestCase extends ESTestCase {
         awaitClusterState(logger, internalCluster().getMasterName(), statePredicate);
     }
 
+    protected void awaitClusterState(String viaNode, Predicate<ClusterState> statePredicate) throws Exception {
+        ClusterServiceUtils.awaitClusterState(logger, statePredicate, internalCluster().getInstance(ClusterService.class, viaNode));
+    }
+
     public static void awaitClusterState(Logger logger, Predicate<ClusterState> statePredicate) throws Exception {
         awaitClusterState(logger, internalCluster().getMasterName(), statePredicate);
     }
@@ -1382,9 +1393,7 @@ public abstract class ESIntegTestCase extends ESTestCase {
         final var masterName = internalCluster().getMasterName();
         for (Client client : cluster().getClients()) {
             localStates.add(
-                SubscribableListener.newForked(
-                    l -> client.admin().cluster().prepareState(TEST_REQUEST_TIMEOUT).all().setLocal(true).execute(l)
-                )
+                SubscribableListener.newForked(l -> client.admin().cluster().prepareState(TEST_REQUEST_TIMEOUT).all().execute(l))
             );
         }
         try (RefCountingListener refCountingListener = new RefCountingListener(future)) {

+ 1 - 1
test/framework/src/main/java/org/elasticsearch/test/InternalTestCluster.java

@@ -2039,7 +2039,7 @@ public final class InternalTestCluster extends TestCluster {
         }
         try {
             ClusterServiceUtils.awaitClusterState(logger, state -> state.nodes().getMasterNode() != null, clusterService(viaNode));
-            final ClusterState state = client(viaNode).admin().cluster().prepareState(TEST_REQUEST_TIMEOUT).setLocal(true).get().getState();
+            final ClusterState state = client(viaNode).admin().cluster().prepareState(TEST_REQUEST_TIMEOUT).get().getState();
             final DiscoveryNode masterNode = state.nodes().getMasterNode();
             if (masterNode == null) {
                 throw new AssertionError("Master is not stable but the method expects a stable master node");

+ 4 - 4
x-pack/plugin/ccr/src/main/java/org/elasticsearch/xpack/ccr/CcrLicenseChecker.java

@@ -14,8 +14,8 @@ import org.elasticsearch.action.ActionResponse;
 import org.elasticsearch.action.ActionType;
 import org.elasticsearch.action.RemoteClusterActionType;
 import org.elasticsearch.action.admin.cluster.state.ClusterStateAction;
-import org.elasticsearch.action.admin.cluster.state.ClusterStateRequest;
 import org.elasticsearch.action.admin.cluster.state.ClusterStateResponse;
+import org.elasticsearch.action.admin.cluster.state.RemoteClusterStateRequest;
 import org.elasticsearch.action.admin.indices.stats.IndexShardStats;
 import org.elasticsearch.action.admin.indices.stats.IndexStats;
 import org.elasticsearch.action.admin.indices.stats.IndicesStatsAction;
@@ -207,7 +207,7 @@ public class CcrLicenseChecker {
     public static void checkRemoteClusterLicenseAndFetchClusterState(
         final Client client,
         final String clusterAlias,
-        final ClusterStateRequest request,
+        final RemoteClusterStateRequest request,
         final Consumer<Exception> onFailure,
         final Consumer<ClusterStateResponse> leaderClusterStateConsumer
     ) {
@@ -242,7 +242,7 @@ public class CcrLicenseChecker {
         final Client client,
         final String clusterAlias,
         final RemoteClusterClient remoteClient,
-        final ClusterStateRequest request,
+        final RemoteClusterStateRequest request,
         final Consumer<Exception> onFailure,
         final Consumer<ClusterStateResponse> leaderClusterStateConsumer,
         final Function<RemoteClusterLicenseChecker.LicenseCheck, ElasticsearchStatusException> nonCompliantLicense,
@@ -279,7 +279,7 @@ public class CcrLicenseChecker {
         final Client client,
         final String clusterAlias,
         final RemoteClusterClient remoteClient,
-        final ClusterStateRequest request,
+        final RemoteClusterStateRequest request,
         final Consumer<Exception> onFailure,
         final Consumer<ClusterStateResponse> leaderClusterStateConsumer,
         final Function<RemoteClusterLicenseChecker.LicenseCheck, ElasticsearchStatusException> nonCompliantLicense,

+ 2 - 3
x-pack/plugin/ccr/src/main/java/org/elasticsearch/xpack/ccr/action/AutoFollowCoordinator.java

@@ -11,8 +11,8 @@ import org.apache.logging.log4j.Logger;
 import org.elasticsearch.ElasticsearchException;
 import org.elasticsearch.ExceptionsHelper;
 import org.elasticsearch.action.ActionListener;
-import org.elasticsearch.action.admin.cluster.state.ClusterStateRequest;
 import org.elasticsearch.action.admin.cluster.state.ClusterStateResponse;
+import org.elasticsearch.action.admin.cluster.state.RemoteClusterStateRequest;
 import org.elasticsearch.action.support.GroupedActionListener;
 import org.elasticsearch.client.internal.Client;
 import org.elasticsearch.cluster.ClusterChangedEvent;
@@ -302,10 +302,9 @@ public class AutoFollowCoordinator extends AbstractLifecycleComponent implements
                     CcrLicenseChecker.checkRemoteClusterLicenseAndFetchClusterState(
                         client,
                         remoteCluster,
-                        new ClusterStateRequest(waitForMetadataTimeOut).clear()
+                        new RemoteClusterStateRequest(waitForMetadataTimeOut).clear()
                             .metadata(true)
                             .routingTable(true)
-                            .local(true)
                             .waitForMetadataVersion(metadataVersion)
                             .waitForTimeout(waitForMetadataTimeOut),
                         e -> handler.accept(null, e),

+ 4 - 4
x-pack/plugin/ccr/src/main/java/org/elasticsearch/xpack/ccr/action/CcrRequests.java

@@ -10,7 +10,7 @@ import org.elasticsearch.ElasticsearchStatusException;
 import org.elasticsearch.action.ActionListener;
 import org.elasticsearch.action.RequestValidators;
 import org.elasticsearch.action.admin.cluster.state.ClusterStateAction;
-import org.elasticsearch.action.admin.cluster.state.ClusterStateRequest;
+import org.elasticsearch.action.admin.cluster.state.RemoteClusterStateRequest;
 import org.elasticsearch.action.admin.indices.alias.IndicesAliasesRequest;
 import org.elasticsearch.action.admin.indices.mapping.put.PutMappingRequest;
 import org.elasticsearch.client.internal.RemoteClusterClient;
@@ -33,8 +33,8 @@ public final class CcrRequests {
 
     private CcrRequests() {}
 
-    public static ClusterStateRequest metadataRequest(String leaderIndex) {
-        ClusterStateRequest clusterStateRequest = new ClusterStateRequest(TimeValue.MAX_VALUE);
+    public static RemoteClusterStateRequest metadataRequest(String leaderIndex) {
+        RemoteClusterStateRequest clusterStateRequest = new RemoteClusterStateRequest(TimeValue.MAX_VALUE);
         clusterStateRequest.clear();
         clusterStateRequest.metadata(true);
         clusterStateRequest.indices(leaderIndex);
@@ -61,7 +61,7 @@ public final class CcrRequests {
         Supplier<TimeValue> timeoutSupplier,
         ActionListener<IndexMetadata> listener
     ) {
-        final ClusterStateRequest request = CcrRequests.metadataRequest(index.getName());
+        final RemoteClusterStateRequest request = CcrRequests.metadataRequest(index.getName());
         if (metadataVersion > 0) {
             request.waitForMetadataVersion(metadataVersion).waitForTimeout(timeoutSupplier.get());
         }

+ 2 - 2
x-pack/plugin/ccr/src/main/java/org/elasticsearch/xpack/ccr/action/TransportPutAutoFollowPatternAction.java

@@ -7,8 +7,8 @@
 package org.elasticsearch.xpack.ccr.action;
 
 import org.elasticsearch.action.ActionListener;
-import org.elasticsearch.action.admin.cluster.state.ClusterStateRequest;
 import org.elasticsearch.action.admin.cluster.state.ClusterStateResponse;
+import org.elasticsearch.action.admin.cluster.state.RemoteClusterStateRequest;
 import org.elasticsearch.action.support.ActionFilters;
 import org.elasticsearch.action.support.master.AcknowledgedResponse;
 import org.elasticsearch.action.support.master.AcknowledgedTransportMasterNodeAction;
@@ -130,7 +130,7 @@ public class TransportPutAutoFollowPatternAction extends AcknowledgedTransportMa
         CcrLicenseChecker.checkRemoteClusterLicenseAndFetchClusterState(
             client,
             request.getRemoteCluster(),
-            new ClusterStateRequest(request.masterNodeTimeout()).clear().metadata(true),
+            new RemoteClusterStateRequest(request.masterNodeTimeout()).clear().metadata(true),
             listener::onFailure,
             consumer
         );

+ 2 - 2
x-pack/plugin/ccr/src/main/java/org/elasticsearch/xpack/ccr/repository/CcrRepository.java

@@ -19,8 +19,8 @@ import org.elasticsearch.action.ActionResponse;
 import org.elasticsearch.action.RemoteClusterActionType;
 import org.elasticsearch.action.SingleResultDeduplicator;
 import org.elasticsearch.action.admin.cluster.state.ClusterStateAction;
-import org.elasticsearch.action.admin.cluster.state.ClusterStateRequest;
 import org.elasticsearch.action.admin.cluster.state.ClusterStateResponse;
+import org.elasticsearch.action.admin.cluster.state.RemoteClusterStateRequest;
 import org.elasticsearch.action.admin.indices.mapping.put.PutMappingRequest;
 import org.elasticsearch.action.admin.indices.stats.IndicesStatsAction;
 import org.elasticsearch.action.admin.indices.stats.IndicesStatsRequest;
@@ -175,7 +175,7 @@ public class CcrRepository extends AbstractLifecycleComponent implements Reposit
             threadPool.getThreadContext(),
             l -> getRemoteClusterClient().execute(
                 ClusterStateAction.REMOTE_TYPE,
-                new ClusterStateRequest(TimeValue.MAX_VALUE).clear().metadata(true).nodes(true),
+                new RemoteClusterStateRequest(TimeValue.MAX_VALUE).clear().metadata(true).nodes(true),
                 l.map(ClusterStateResponse::getState)
             )
         );

+ 2 - 2
x-pack/plugin/ccr/src/test/java/org/elasticsearch/xpack/ccr/CcrLicenseCheckerTests.java

@@ -8,8 +8,8 @@
 package org.elasticsearch.xpack.ccr;
 
 import org.elasticsearch.ElasticsearchStatusException;
-import org.elasticsearch.action.admin.cluster.state.ClusterStateRequest;
 import org.elasticsearch.action.admin.cluster.state.ClusterStateResponse;
+import org.elasticsearch.action.admin.cluster.state.RemoteClusterStateRequest;
 import org.elasticsearch.client.internal.Client;
 import org.elasticsearch.client.internal.RemoteClusterClient;
 import org.elasticsearch.cluster.ClusterName;
@@ -138,7 +138,7 @@ public class CcrLicenseCheckerTests extends ESTestCase {
                 Client client,
                 String clusterAlias,
                 RemoteClusterClient remoteClient,
-                ClusterStateRequest request,
+                RemoteClusterStateRequest request,
                 Consumer<Exception> onFailure,
                 Consumer<ClusterStateResponse> leaderClusterStateConsumer,
                 Function<RemoteClusterLicenseChecker.LicenseCheck, ElasticsearchStatusException> nonCompliantLicense,

+ 1 - 10
x-pack/plugin/ml/src/internalClusterTest/java/org/elasticsearch/xpack/ml/integration/MlDistributedFailureIT.java

@@ -12,7 +12,6 @@ import org.elasticsearch.action.index.IndexRequest;
 import org.elasticsearch.action.search.SearchRequest;
 import org.elasticsearch.action.support.IndicesOptions;
 import org.elasticsearch.action.support.WriteRequest;
-import org.elasticsearch.cluster.ClusterState;
 import org.elasticsearch.cluster.metadata.Metadata;
 import org.elasticsearch.cluster.node.DiscoveryNode;
 import org.elasticsearch.cluster.node.DiscoveryNodeRole;
@@ -126,15 +125,7 @@ public class MlDistributedFailureIT extends BaseMlIntegTestCase {
             logger.info("Stopping dedicated master node");
             Settings masterDataPathSettings = internalCluster().dataPathSettings(internalCluster().getMasterName());
             internalCluster().stopCurrentMasterNode();
-            assertBusy(() -> {
-                ClusterState state = client(mlAndDataNode).admin()
-                    .cluster()
-                    .prepareState(TEST_REQUEST_TIMEOUT)
-                    .setLocal(true)
-                    .get()
-                    .getState();
-                assertNull(state.nodes().getMasterNodeId());
-            });
+            awaitMasterNotFound(mlAndDataNode);
             logger.info("Restarting dedicated master node");
             internalCluster().startNode(Settings.builder().put(masterDataPathSettings).put(masterOnlyNode()).build());
             ensureStableCluster();

+ 2 - 2
x-pack/plugin/security/qa/security-trial/src/javaRestTest/java/org/elasticsearch/xpack/security/crossclusteraccess/CrossClusterAccessHeadersForCcsRestIT.java

@@ -13,8 +13,8 @@ import org.apache.http.nio.entity.NStringEntity;
 import org.elasticsearch.TransportVersion;
 import org.elasticsearch.action.admin.cluster.remote.RemoteClusterNodesAction;
 import org.elasticsearch.action.admin.cluster.state.ClusterStateAction;
-import org.elasticsearch.action.admin.cluster.state.ClusterStateRequest;
 import org.elasticsearch.action.admin.cluster.state.ClusterStateResponse;
+import org.elasticsearch.action.admin.cluster.state.RemoteClusterStateRequest;
 import org.elasticsearch.action.search.SearchRequest;
 import org.elasticsearch.action.search.SearchShardsRequest;
 import org.elasticsearch.action.search.SearchShardsResponse;
@@ -1174,7 +1174,7 @@ public class CrossClusterAccessHeadersForCcsRestIT extends SecurityOnTrialLicens
             service.registerRequestHandler(
                 ClusterStateAction.NAME,
                 EsExecutors.DIRECT_EXECUTOR_SERVICE,
-                ClusterStateRequest::new,
+                RemoteClusterStateRequest::new,
                 (request, channel, task) -> {
                     capturedHeaders.add(
                         new CapturedActionWithHeaders(task.getAction(), Map.copyOf(threadPool.getThreadContext().getHeaders()))

+ 1 - 1
x-pack/plugin/security/src/test/java/org/elasticsearch/test/SecurityIntegTestCase.java

@@ -398,7 +398,7 @@ public abstract class SecurityIntegTestCase extends ESIntegTestCase {
     public void assertSecurityIndexActive(TestCluster testCluster) throws Exception {
         for (Client client : testCluster.getClients()) {
             assertBusy(() -> {
-                ClusterState clusterState = client.admin().cluster().prepareState(TEST_REQUEST_TIMEOUT).setLocal(true).get().getState();
+                ClusterState clusterState = client.admin().cluster().prepareState(TEST_REQUEST_TIMEOUT).get().getState();
                 assertFalse(clusterState.blocks().hasGlobalBlock(GatewayService.STATE_NOT_RECOVERED_BLOCK));
                 Index securityIndex = resolveSecurityIndex(clusterState.metadata());
                 assertNotNull(securityIndex);