Browse Source

Set timeout of master requests on follower to unbounded (#60070)

Today, a follow task will fail if the master node of the follower 
cluster is temporarily overloaded and unable to process master node
requests (such as update mapping, setting, or alias) from a follow-task
within the default timeout. This error is transient, and follow-tasks
should not abort. We can avoid this problem by setting the timeout of
master node requests on the follower cluster to unbounded.

Closes #56891
Nhat Nguyen 5 years ago
parent
commit
f238d0a367

+ 1 - 0
libs/core/src/main/java/org/elasticsearch/common/unit/TimeValue.java

@@ -30,6 +30,7 @@ public class TimeValue implements Comparable<TimeValue> {
 
     public static final TimeValue MINUS_ONE = timeValueMillis(-1);
     public static final TimeValue ZERO = timeValueMillis(0);
+    public static final TimeValue MAX_VALUE = TimeValue.timeValueNanos(Long.MAX_VALUE);
 
     private static final long C0 = 1L;
     private static final long C1 = C0 * 1000L;

+ 15 - 11
x-pack/plugin/ccr/src/internalClusterTest/java/org/elasticsearch/xpack/ccr/CcrRepositoryIT.java

@@ -69,7 +69,7 @@ public class CcrRepositoryIT extends CcrIntegTestCase {
             fail("need repository");
         }
 
-        ClusterUpdateSettingsRequest putSecondCluster = new ClusterUpdateSettingsRequest();
+        ClusterUpdateSettingsRequest putSecondCluster = newSettingsRequest();
         String address = getFollowerCluster().getDataNodeInstance(TransportService.class).boundAddress().publishAddress().toString();
         putSecondCluster.persistentSettings(Settings.builder().put("cluster.remote.follower_cluster_copy.seeds", address));
         assertAcked(followerClient().admin().cluster().updateSettings(putSecondCluster).actionGet());
@@ -83,19 +83,19 @@ public class CcrRepositoryIT extends CcrIntegTestCase {
             fail("need repository");
         }
 
-        ClusterUpdateSettingsRequest deleteLeaderCluster = new ClusterUpdateSettingsRequest();
+        ClusterUpdateSettingsRequest deleteLeaderCluster = newSettingsRequest();
         deleteLeaderCluster.persistentSettings(Settings.builder().put("cluster.remote.leader_cluster.seeds", ""));
         assertAcked(followerClient().admin().cluster().updateSettings(deleteLeaderCluster).actionGet());
 
         expectThrows(RepositoryMissingException.class, () -> repositoriesService.repository(leaderClusterRepoName));
 
-        ClusterUpdateSettingsRequest deleteSecondCluster = new ClusterUpdateSettingsRequest();
+        ClusterUpdateSettingsRequest deleteSecondCluster = newSettingsRequest();
         deleteSecondCluster.persistentSettings(Settings.builder().put("cluster.remote.follower_cluster_copy.seeds", ""));
         assertAcked(followerClient().admin().cluster().updateSettings(deleteSecondCluster).actionGet());
 
         expectThrows(RepositoryMissingException.class, () -> repositoriesService.repository(followerCopyRepoName));
 
-        ClusterUpdateSettingsRequest putLeaderRequest = new ClusterUpdateSettingsRequest();
+        ClusterUpdateSettingsRequest putLeaderRequest = newSettingsRequest();
         address = getLeaderCluster().getDataNodeInstance(TransportService.class).boundAddress().publishAddress().toString();
         putLeaderRequest.persistentSettings(Settings.builder().put("cluster.remote.leader_cluster.seeds", address));
         assertAcked(followerClient().admin().cluster().updateSettings(putLeaderRequest).actionGet());
@@ -119,7 +119,7 @@ public class CcrRepositoryIT extends CcrIntegTestCase {
             .put(CcrSettings.CCR_FOLLOWING_INDEX_SETTING.getKey(), true);
         RestoreSnapshotRequest restoreRequest = new RestoreSnapshotRequest(leaderClusterRepoName, CcrRepository.LATEST)
             .indices(leaderIndex).indicesOptions(indicesOptions).renamePattern("^(.*)$")
-            .renameReplacement(followerIndex).masterNodeTimeout(new TimeValue(1L, TimeUnit.HOURS))
+            .renameReplacement(followerIndex).masterNodeTimeout(TimeValue.MAX_VALUE)
             .indexSettings(settingsBuilder);
 
         PlainActionFuture<RestoreInfo> future = PlainActionFuture.newFuture();
@@ -160,7 +160,7 @@ public class CcrRepositoryIT extends CcrIntegTestCase {
     }
 
     public void testDocsAreRecovered() throws Exception {
-        ClusterUpdateSettingsRequest settingsRequest = new ClusterUpdateSettingsRequest();
+        ClusterUpdateSettingsRequest settingsRequest = newSettingsRequest();
         String chunkSize = randomFrom("4KB", "128KB", "1MB");
         settingsRequest.persistentSettings(Settings.builder().put(CcrSettings.RECOVERY_CHUNK_SIZE.getKey(), chunkSize));
         assertAcked(followerClient().admin().cluster().updateSettings(settingsRequest).actionGet());
@@ -204,7 +204,7 @@ public class CcrRepositoryIT extends CcrIntegTestCase {
             assertExpectedDocument(followerIndex, i);
         }
 
-        settingsRequest = new ClusterUpdateSettingsRequest();
+        settingsRequest = newSettingsRequest();
         ByteSizeValue defaultValue = CcrSettings.RECOVERY_CHUNK_SIZE.getDefault(Settings.EMPTY);
         settingsRequest.persistentSettings(Settings.builder().put(CcrSettings.RECOVERY_CHUNK_SIZE.getKey(), defaultValue));
         assertAcked(followerClient().admin().cluster().updateSettings(settingsRequest).actionGet());
@@ -213,7 +213,7 @@ public class CcrRepositoryIT extends CcrIntegTestCase {
     public void testRateLimitingIsEmployed() throws Exception {
         boolean followerRateLimiting = randomBoolean();
 
-        ClusterUpdateSettingsRequest settingsRequest = new ClusterUpdateSettingsRequest();
+        ClusterUpdateSettingsRequest settingsRequest = newSettingsRequest();
         settingsRequest.persistentSettings(Settings.builder().put(CcrSettings.RECOVERY_MAX_BYTES_PER_SECOND.getKey(), "10K"));
         if (followerRateLimiting) {
             assertAcked(followerClient().admin().cluster().updateSettings(settingsRequest).actionGet());
@@ -257,7 +257,7 @@ public class CcrRepositoryIT extends CcrIntegTestCase {
             .put(CcrSettings.CCR_FOLLOWING_INDEX_SETTING.getKey(), true);
         RestoreSnapshotRequest restoreRequest = new RestoreSnapshotRequest(leaderClusterRepoName, CcrRepository.LATEST)
             .indices(leaderIndex).indicesOptions(indicesOptions).renamePattern("^(.*)$")
-            .renameReplacement(followerIndex).masterNodeTimeout(new TimeValue(1L, TimeUnit.HOURS))
+            .renameReplacement(followerIndex).masterNodeTimeout(TimeValue.MAX_VALUE)
             .indexSettings(settingsBuilder);
 
         PlainActionFuture<RestoreInfo> future = PlainActionFuture.newFuture();
@@ -270,7 +270,7 @@ public class CcrRepositoryIT extends CcrIntegTestCase {
             assertTrue(restoreSources.stream().anyMatch(cr -> cr.getThrottleTime() > 0));
         }
 
-        settingsRequest = new ClusterUpdateSettingsRequest();
+        settingsRequest = newSettingsRequest();
         ByteSizeValue defaultValue = CcrSettings.RECOVERY_MAX_BYTES_PER_SECOND.getDefault(Settings.EMPTY);
         settingsRequest.persistentSettings(Settings.builder().put(CcrSettings.RECOVERY_MAX_BYTES_PER_SECOND.getKey(), defaultValue));
         if (followerRateLimiting) {
@@ -281,7 +281,7 @@ public class CcrRepositoryIT extends CcrIntegTestCase {
     }
 
     public void testIndividualActionsTimeout() throws Exception {
-        ClusterUpdateSettingsRequest settingsRequest = new ClusterUpdateSettingsRequest();
+        ClusterUpdateSettingsRequest settingsRequest = newSettingsRequest();
         TimeValue timeValue = TimeValue.timeValueMillis(100);
         settingsRequest.persistentSettings(Settings.builder().put(CcrSettings.INDICES_RECOVERY_ACTION_TIMEOUT_SETTING.getKey(), timeValue));
         assertAcked(followerClient().admin().cluster().updateSettings(settingsRequest).actionGet());
@@ -359,6 +359,10 @@ public class CcrRepositoryIT extends CcrIntegTestCase {
         }
     }
 
+    private ClusterUpdateSettingsRequest newSettingsRequest() {
+        return new ClusterUpdateSettingsRequest().masterNodeTimeout(TimeValue.MAX_VALUE);
+    }
+
     public void testFollowerMappingIsUpdated() throws IOException {
         String leaderClusterRepoName = CcrRepository.NAME_PREFIX + "leader_cluster";
         String leaderIndex = "index1";

+ 6 - 5
x-pack/plugin/ccr/src/internalClusterTest/java/org/elasticsearch/xpack/ccr/CcrRetentionLeaseIT.java

@@ -119,7 +119,7 @@ public class CcrRetentionLeaseIT extends CcrIntegTestCase {
             final int numberOfReplicas,
             final String followerIndex,
             final int numberOfDocuments) throws IOException {
-        final ClusterUpdateSettingsRequest settingsRequest = new ClusterUpdateSettingsRequest();
+        final ClusterUpdateSettingsRequest settingsRequest = new ClusterUpdateSettingsRequest().masterNodeTimeout(TimeValue.MAX_VALUE);
         final String chunkSize = new ByteSizeValue(randomFrom(4, 128, 1024), ByteSizeUnit.KB).getStringRep();
         settingsRequest.persistentSettings(Settings.builder().put(CcrSettings.RECOVERY_CHUNK_SIZE.getKey(), chunkSize));
         assertAcked(followerClient().admin().cluster().updateSettings(settingsRequest).actionGet());
@@ -129,7 +129,8 @@ public class CcrRetentionLeaseIT extends CcrIntegTestCase {
         final Map<String, String> additionalSettings = new HashMap<>();
         additionalSettings.put(IndexService.RETENTION_LEASE_SYNC_INTERVAL_SETTING.getKey(), TimeValue.timeValueMillis(200).getStringRep());
         final String leaderIndexSettings = getIndexSettings(numberOfShards, numberOfReplicas, additionalSettings);
-        assertAcked(leaderClient().admin().indices().prepareCreate(leaderIndex).setSource(leaderIndexSettings, XContentType.JSON));
+        assertAcked(leaderClient().admin().indices().prepareCreate(leaderIndex)
+            .setMasterNodeTimeout(TimeValue.MAX_VALUE).setSource(leaderIndexSettings, XContentType.JSON));
         ensureLeaderGreen(leaderIndex);
 
         logger.info("indexing [{}] docs", numberOfDocuments);
@@ -152,7 +153,7 @@ public class CcrRetentionLeaseIT extends CcrIntegTestCase {
                 .indicesOptions(indicesOptions)
                 .renamePattern("^(.*)$")
                 .renameReplacement(followerIndex)
-                .masterNodeTimeout(new TimeValue(1L, TimeUnit.HOURS));
+                .masterNodeTimeout(TimeValue.MAX_VALUE);
     }
 
     public void testRetentionLeaseIsTakenAtTheStartOfRecovery() throws Exception {
@@ -470,7 +471,7 @@ public class CcrRetentionLeaseIT extends CcrIntegTestCase {
         ensureFollowerGreen(true, followerIndex);
 
         pauseFollow(followerIndex);
-        followerClient().admin().indices().close(new CloseIndexRequest(followerIndex)).actionGet();
+        followerClient().admin().indices().close(new CloseIndexRequest(followerIndex).masterNodeTimeout(TimeValue.MAX_VALUE)).actionGet();
 
         // we will disrupt requests to remove retention leases for these random shards
         final Set<Integer> shardIds =
@@ -944,7 +945,7 @@ public class CcrRetentionLeaseIT extends CcrIntegTestCase {
         ensureFollowerGreen(true, followerIndex);
 
         pauseFollow(followerIndex);
-        followerClient().admin().indices().close(new CloseIndexRequest(followerIndex)).actionGet();
+        followerClient().admin().indices().close(new CloseIndexRequest(followerIndex).masterNodeTimeout(TimeValue.MAX_VALUE)).actionGet();
 
         final ClusterStateResponse followerIndexClusterState =
                 followerClient().admin().cluster().prepareState().clear().setMetadata(true).setIndices(followerIndex).get();

+ 2 - 2
x-pack/plugin/ccr/src/internalClusterTest/java/org/elasticsearch/xpack/ccr/CloseFollowerIndexIT.java

@@ -96,7 +96,7 @@ public class CloseFollowerIndexIT extends CcrIntegTestCase {
 
         atLeastDocsIndexed(followerClient(), "index2", 32);
 
-        CloseIndexRequest closeIndexRequest = new CloseIndexRequest("index2");
+        CloseIndexRequest closeIndexRequest = new CloseIndexRequest("index2").masterNodeTimeout(TimeValue.MAX_VALUE);
         closeIndexRequest.waitForActiveShards(ActiveShardCount.NONE);
         AcknowledgedResponse response = followerClient().admin().indices().close(closeIndexRequest).get();
         assertThat(response.isAcknowledged(), is(true));
@@ -111,7 +111,7 @@ public class CloseFollowerIndexIT extends CcrIntegTestCase {
             thread.join();
         }
 
-        assertAcked(followerClient().admin().indices().open(new OpenIndexRequest("index2")).get());
+        assertAcked(followerClient().admin().indices().open(new OpenIndexRequest("index2").masterNodeTimeout(TimeValue.MAX_VALUE)).get());
 
         clusterState = followerClient().admin().cluster().prepareState().get().getState();
         assertThat(clusterState.metadata().index("index2").getState(), is(IndexMetadata.State.OPEN));

+ 1 - 1
x-pack/plugin/ccr/src/internalClusterTest/java/org/elasticsearch/xpack/ccr/FollowerFailOverIT.java

@@ -223,7 +223,7 @@ public class FollowerFailOverIT extends CcrIntegTestCase {
         });
         flushingOnFollower.start();
         awaitGlobalCheckpointAtLeast(followerClient(), new ShardId(resolveFollowerIndex("follower-index"), 0), 50);
-        followerClient().admin().indices().prepareUpdateSettings("follower-index")
+        followerClient().admin().indices().prepareUpdateSettings("follower-index").setMasterNodeTimeout(TimeValue.MAX_VALUE)
             .setSettings(Settings.builder().put(IndexMetadata.SETTING_NUMBER_OF_REPLICAS, numberOfReplicas + 1).build()).get();
         ensureFollowerGreen("follower-index");
         awaitGlobalCheckpointAtLeast(followerClient(), new ShardId(resolveFollowerIndex("follower-index"), 0), 100);

+ 25 - 20
x-pack/plugin/ccr/src/internalClusterTest/java/org/elasticsearch/xpack/ccr/IndexFollowingIT.java

@@ -138,7 +138,7 @@ public class IndexFollowingIT extends CcrIntegTestCase {
         final int numberOfPrimaryShards = randomIntBetween(1, 3);
         int numberOfReplicas = between(0, 1);
 
-        followerClient().admin().cluster().prepareUpdateSettings()
+        followerClient().admin().cluster().prepareUpdateSettings().setMasterNodeTimeout(TimeValue.MAX_VALUE)
             .setTransientSettings(Settings.builder().put(CcrSettings.RECOVERY_CHUNK_SIZE.getKey(),
                 new ByteSizeValue(randomIntBetween(1, 1000), ByteSizeUnit.KB)))
             .get();
@@ -410,6 +410,7 @@ public class IndexFollowingIT extends CcrIntegTestCase {
     }
 
     public void testDoNotAllowPutMappingToFollower() throws Exception {
+        removeMasterNodeRequestsValidatorOnFollowerCluster();
         final String leaderIndexSettings = getIndexSettings(between(1, 2), between(0, 1));
         assertAcked(leaderClient().admin().indices().prepareCreate("index-1").setSource(leaderIndexSettings, XContentType.JSON));
         followerClient().execute(PutFollowAction.INSTANCE, putFollow("index-1", "index-2")).get();
@@ -432,7 +433,7 @@ public class IndexFollowingIT extends CcrIntegTestCase {
                 getIndexSettings(between(1, 2), between(0, 1));
         assertAcked(leaderClient().admin().indices().prepareCreate("leader").setSource(leaderIndexSettings, XContentType.JSON));
         followerClient().execute(PutFollowAction.INSTANCE, putFollow("leader", "follower")).get();
-        final IndicesAliasesRequest request = new IndicesAliasesRequest()
+        final IndicesAliasesRequest request = new IndicesAliasesRequest().masterNodeTimeout(TimeValue.MAX_VALUE)
                 .addAliasAction(IndicesAliasesRequest.AliasActions.add().index("follower").alias("follower_alias"));
         final ElasticsearchStatusException e =
                 expectThrows(ElasticsearchStatusException.class, () -> followerClient().admin().indices().aliases(request).actionGet());
@@ -449,10 +450,10 @@ public class IndexFollowingIT extends CcrIntegTestCase {
         assertAcked(leaderClient().admin().indices().prepareCreate("leader").setSource(leaderIndexSettings, XContentType.JSON));
         followerClient().execute(PutFollowAction.INSTANCE, putFollow("leader", "follower")).get();
         pauseFollow("follower");
-        followerClient().admin().indices().close(new CloseIndexRequest("follower")).actionGet();
+        followerClient().admin().indices().close(new CloseIndexRequest("follower").masterNodeTimeout(TimeValue.MAX_VALUE)).actionGet();
         assertAcked(followerClient().execute(UnfollowAction.INSTANCE, new UnfollowAction.Request("follower")).actionGet());
-        followerClient().admin().indices().open(new OpenIndexRequest("follower")).actionGet();
-        final IndicesAliasesRequest request = new IndicesAliasesRequest()
+        followerClient().admin().indices().open(new OpenIndexRequest("follower").masterNodeTimeout(TimeValue.MAX_VALUE)).actionGet();
+        final IndicesAliasesRequest request = new IndicesAliasesRequest().masterNodeTimeout(TimeValue.MAX_VALUE)
                 .addAliasAction(IndicesAliasesRequest.AliasActions.add().index("follower").alias("follower_alias"));
         assertAcked(followerClient().admin().indices().aliases(request).actionGet());
         final GetAliasesResponse response =
@@ -576,7 +577,10 @@ public class IndexFollowingIT extends CcrIntegTestCase {
     public void testFollowNonExistentIndex() throws Exception {
         String indexSettings = getIndexSettings(1, 0);
         assertAcked(leaderClient().admin().indices().prepareCreate("test-leader").setSource(indexSettings, XContentType.JSON).get());
-        assertAcked(followerClient().admin().indices().prepareCreate("test-follower").setSource(indexSettings, XContentType.JSON).get());
+        assertAcked(followerClient().admin().indices().prepareCreate("test-follower")
+            .setSource(indexSettings, XContentType.JSON)
+            .setMasterNodeTimeout(TimeValue.MAX_VALUE)
+            .get());
         ensureLeaderGreen("test-leader");
         ensureFollowerGreen("test-follower");
         // Leader index does not exist.
@@ -635,9 +639,9 @@ public class IndexFollowingIT extends CcrIntegTestCase {
         PutFollowAction.Request followRequest = putFollow("index1", "index2");
         followerClient().execute(PutFollowAction.INSTANCE, followRequest).get();
         pauseFollow("index2");
-        followerClient().admin().indices().close(new CloseIndexRequest("index2")).actionGet();
+        followerClient().admin().indices().close(new CloseIndexRequest("index2").masterNodeTimeout(TimeValue.MAX_VALUE)).actionGet();
 
-        UpdateSettingsRequest updateSettingsRequest = new UpdateSettingsRequest("index2");
+        UpdateSettingsRequest updateSettingsRequest = new UpdateSettingsRequest("index2").masterNodeTimeout(TimeValue.MAX_VALUE);
         updateSettingsRequest.settings(Settings.builder().put(CcrSettings.CCR_FOLLOWING_INDEX_SETTING.getKey(), false).build());
         Exception e = expectThrows(IllegalArgumentException.class,
             () -> followerClient().admin().indices().updateSettings(updateSettingsRequest).actionGet());
@@ -691,7 +695,7 @@ public class IndexFollowingIT extends CcrIntegTestCase {
         leaderClient().prepareIndex("index1").setId("1").setSource("{}", XContentType.JSON).get();
         assertBusy(() -> assertThat(followerClient().prepareSearch("index2").get().getHits().getTotalHits().value, equalTo(1L)));
 
-        followerClient().admin().indices().close(new CloseIndexRequest("index2")).actionGet();
+        followerClient().admin().indices().close(new CloseIndexRequest("index2").masterNodeTimeout(TimeValue.MAX_VALUE)).actionGet();
         leaderClient().prepareIndex("index1").setId("2").setSource("{}", XContentType.JSON).get();
         assertBusy(() -> {
             StatsResponses response = followerClient().execute(FollowStatsAction.INSTANCE, new StatsRequest()).actionGet();
@@ -700,7 +704,7 @@ public class IndexFollowingIT extends CcrIntegTestCase {
             assertThat(response.getStatsResponses(), hasSize(1));
             assertThat(response.getStatsResponses().get(0).status().failedWriteRequests(), greaterThanOrEqualTo(1L));
         });
-        followerClient().admin().indices().open(new OpenIndexRequest("index2")).actionGet();
+        followerClient().admin().indices().open(new OpenIndexRequest("index2").masterNodeTimeout(TimeValue.MAX_VALUE)).actionGet();
         assertBusy(() -> assertThat(followerClient().prepareSearch("index2").get().getHits().getTotalHits().value, equalTo(2L)));
 
         pauseFollow("index2");
@@ -765,7 +769,7 @@ public class IndexFollowingIT extends CcrIntegTestCase {
         assertTrue(response.isIndexFollowingStarted());
 
         pauseFollow(followerIndex);
-        assertAcked(leaderClient().admin().indices().prepareClose(leaderIndex));
+        assertAcked(leaderClient().admin().indices().prepareClose(leaderIndex).setMasterNodeTimeout(TimeValue.MAX_VALUE));
 
         expectThrows(IndexClosedException.class, () ->
             followerClient().execute(ResumeFollowAction.INSTANCE, resumeFollow(followerIndex)).actionGet());
@@ -784,7 +788,7 @@ public class IndexFollowingIT extends CcrIntegTestCase {
         leaderClient().prepareIndex("index1").setId("1").setSource("{}", XContentType.JSON).get();
         assertBusy(() -> assertThat(followerClient().prepareSearch("index2").get().getHits().getTotalHits().value, equalTo(1L)));
 
-        followerClient().admin().indices().delete(new DeleteIndexRequest("index2")).actionGet();
+        followerClient().admin().indices().delete(new DeleteIndexRequest("index2").masterNodeTimeout(TimeValue.MAX_VALUE)).actionGet();
         leaderClient().prepareIndex("index1").setId("2").setSource("{}", XContentType.JSON).get();
         assertBusy(() -> {
             StatsResponses response = followerClient().execute(FollowStatsAction.INSTANCE, new StatsRequest()).actionGet();
@@ -808,7 +812,7 @@ public class IndexFollowingIT extends CcrIntegTestCase {
                 .put(IndexMetadata.SETTING_NUMBER_OF_REPLICAS, 0)
                 .build()));
         followerClient().execute(PutFollowAction.INSTANCE, putFollow("leader", "follower")).get();
-        assertAcked(followerClient().admin().indices().prepareCreate("regular-index"));
+        assertAcked(followerClient().admin().indices().prepareCreate("regular-index").setMasterNodeTimeout(TimeValue.MAX_VALUE));
         assertAcked(followerClient().execute(PauseFollowAction.INSTANCE, new PauseFollowAction.Request("follower")).actionGet());
         assertThat(expectThrows(IllegalArgumentException.class, () -> followerClient().execute(
             PauseFollowAction.INSTANCE, new PauseFollowAction.Request("follower")).actionGet()).getMessage(),
@@ -836,9 +840,9 @@ public class IndexFollowingIT extends CcrIntegTestCase {
 
         // Turn follow index into a regular index by: pausing shard follow, close index, unfollow index and then open index:
         pauseFollow("index2");
-        followerClient().admin().indices().close(new CloseIndexRequest("index2")).actionGet();
+        followerClient().admin().indices().close(new CloseIndexRequest("index2").masterNodeTimeout(TimeValue.MAX_VALUE)).actionGet();
         assertAcked(followerClient().execute(UnfollowAction.INSTANCE, new UnfollowAction.Request("index2")).actionGet());
-        followerClient().admin().indices().open(new OpenIndexRequest("index2")).actionGet();
+        followerClient().admin().indices().open(new OpenIndexRequest("index2").masterNodeTimeout(TimeValue.MAX_VALUE)).actionGet();
         ensureFollowerGreen("index2");
 
         // Indexing succeeds now, because index2 is no longer a follow index:
@@ -1144,6 +1148,7 @@ public class IndexFollowingIT extends CcrIntegTestCase {
     }
 
     public void testMustCloseIndexAndPauseToRestartWithPutFollowing() throws Exception {
+        removeMasterNodeRequestsValidatorOnFollowerCluster();
         final int numberOfPrimaryShards = randomIntBetween(1, 3);
         final String leaderIndexSettings = getIndexSettings(numberOfPrimaryShards, between(0, 1));
         assertAcked(leaderClient().admin().indices().prepareCreate("index1").setSource(leaderIndexSettings, XContentType.JSON));
@@ -1290,7 +1295,7 @@ public class IndexFollowingIT extends CcrIntegTestCase {
             exceptionConsumer.accept(exceptions);
         });
 
-        followerClient().admin().indices().prepareClose("index2").get();
+        followerClient().admin().indices().prepareClose("index2").setMasterNodeTimeout(TimeValue.MAX_VALUE).get();
         pauseFollow("index2");
         if (randomBoolean()) {
             assertAcked(followerClient().execute(UnfollowAction.INSTANCE, new UnfollowAction.Request("index2")).actionGet());
@@ -1326,11 +1331,11 @@ public class IndexFollowingIT extends CcrIntegTestCase {
         assertTrue(response.isFollowIndexShardsAcked());
         assertTrue(response.isIndexFollowingStarted());
 
-        logger.info("Indexing [{}] docs while updateing remote config", firstBatchNumDocs);
+        logger.info("Indexing [{}] docs while updating remote config", firstBatchNumDocs);
         try (BackgroundIndexer indexer = new BackgroundIndexer("index1", "_doc", leaderClient(), firstBatchNumDocs,
             randomIntBetween(1, 5))) {
 
-            ClusterUpdateSettingsRequest settingsRequest = new ClusterUpdateSettingsRequest();
+            ClusterUpdateSettingsRequest settingsRequest = new ClusterUpdateSettingsRequest().masterNodeTimeout(TimeValue.MAX_VALUE);
             String address = getLeaderCluster().getDataNodeInstance(TransportService.class).boundAddress().publishAddress().toString();
             Setting<Boolean> compress = RemoteClusterService.REMOTE_CLUSTER_COMPRESS.getConcreteSettingForNamespace("leader_cluster");
             Setting<List<String>> seeds = SniffConnectionStrategy.REMOTE_CLUSTER_SEEDS.getConcreteSettingForNamespace("leader_cluster");
@@ -1361,7 +1366,7 @@ public class IndexFollowingIT extends CcrIntegTestCase {
 
             assertMaxSeqNoOfUpdatesIsTransferred(resolveLeaderIndex("index1"), resolveFollowerIndex("index2"), numberOfPrimaryShards);
         } finally {
-            ClusterUpdateSettingsRequest settingsRequest = new ClusterUpdateSettingsRequest();
+            ClusterUpdateSettingsRequest settingsRequest = new ClusterUpdateSettingsRequest().masterNodeTimeout(TimeValue.MAX_VALUE);
             String address = getLeaderCluster().getDataNodeInstance(TransportService.class).boundAddress().publishAddress().toString();
             Setting<Boolean> compress = RemoteClusterService.REMOTE_CLUSTER_COMPRESS.getConcreteSettingForNamespace("leader_cluster");
             Setting<List<String>> seeds = SniffConnectionStrategy.REMOTE_CLUSTER_SEEDS.getConcreteSettingForNamespace("leader_cluster");
@@ -1391,7 +1396,7 @@ public class IndexFollowingIT extends CcrIntegTestCase {
             assertThat(listTasksResponse.getTasks(), hasSize(numberOfShards));
         });
 
-        assertAcked(followerClient().admin().indices().prepareDelete("index2"));
+        assertAcked(followerClient().admin().indices().prepareDelete("index2").setMasterNodeTimeout(TimeValue.MAX_VALUE));
 
         assertBusy(() -> {
             String action = ShardFollowTask.NAME + "[c]";

+ 1 - 0
x-pack/plugin/ccr/src/internalClusterTest/java/org/elasticsearch/xpack/ccr/PrimaryFollowerAllocationIT.java

@@ -102,6 +102,7 @@ public class PrimaryFollowerAllocationIT extends CcrIntegTestCase {
         });
         // Follower primaries can be relocated to nodes without the remote cluster client role
         followerClient().admin().indices().prepareUpdateSettings(followerIndex)
+            .setMasterNodeTimeout(TimeValue.MAX_VALUE)
             .setSettings(Settings.builder().put("index.routing.allocation.include._name", String.join(",", dataOnlyNodes)))
             .get();
         assertBusy(() -> {

+ 1 - 0
x-pack/plugin/ccr/src/main/java/org/elasticsearch/xpack/ccr/action/CcrRequests.java

@@ -43,6 +43,7 @@ public final class CcrRequests {
         PutMappingRequest putMappingRequest = new PutMappingRequest(followerIndex);
         putMappingRequest.origin("ccr");
         putMappingRequest.source(mappingMetadata.source().string(), XContentType.JSON);
+        putMappingRequest.masterNodeTimeout(TimeValue.MAX_VALUE);
         return putMappingRequest;
     }
 

+ 8 - 6
x-pack/plugin/ccr/src/main/java/org/elasticsearch/xpack/ccr/action/ShardFollowTasksExecutor.java

@@ -205,8 +205,9 @@ public class ShardFollowTasksExecutor extends PersistentTasksExecutor<ShardFollo
                         // if so just update the follower index's settings:
                         if (updatedSettings.keySet().stream().allMatch(indexScopedSettings::isDynamicSetting)) {
                             // If only dynamic settings have been updated then just update these settings in follower index:
-                            final UpdateSettingsRequest updateSettingsRequest = new UpdateSettingsRequest(followIndex.getName());
-                            updateSettingsRequest.settings(updatedSettings);
+                            final UpdateSettingsRequest updateSettingsRequest = new UpdateSettingsRequest(followIndex.getName())
+                                .masterNodeTimeout(TimeValue.MAX_VALUE)
+                                .settings(updatedSettings);
                             followerClient.admin().indices().updateSettings(updateSettingsRequest,
                                 ActionListener.wrap(response -> finalHandler.accept(leaderIMD.getSettingsVersion()), errorHandler));
                         } else {
@@ -327,7 +328,7 @@ public class ShardFollowTasksExecutor extends PersistentTasksExecutor<ShardFollo
                     if (aliasActions.isEmpty()) {
                         handler.accept(leaderIndexMetadata.getAliasesVersion());
                     } else {
-                        final var request = new IndicesAliasesRequest();
+                        final var request = new IndicesAliasesRequest().masterNodeTimeout(TimeValue.MAX_VALUE);
                         request.origin("ccr");
                         aliasActions.forEach(request::addAliasAction);
                         followerClient.admin().indices().aliases(
@@ -347,7 +348,7 @@ public class ShardFollowTasksExecutor extends PersistentTasksExecutor<ShardFollo
                                                               Settings updatedSettings,
                                                               Runnable handler,
                                                               Consumer<Exception> onFailure) {
-                CloseIndexRequest closeRequest = new CloseIndexRequest(followIndex);
+                CloseIndexRequest closeRequest = new CloseIndexRequest(followIndex).masterNodeTimeout(TimeValue.MAX_VALUE);
                 CheckedConsumer<CloseIndexResponse, Exception> onResponse = response -> {
                     updateSettingsAndOpenIndex(followIndex, updatedSettings, handler, onFailure);
                 };
@@ -358,7 +359,8 @@ public class ShardFollowTasksExecutor extends PersistentTasksExecutor<ShardFollo
                                                     Settings updatedSettings,
                                                     Runnable handler,
                                                     Consumer<Exception> onFailure) {
-                final UpdateSettingsRequest updateSettingsRequest = new UpdateSettingsRequest(followIndex);
+                final UpdateSettingsRequest updateSettingsRequest = new UpdateSettingsRequest(followIndex)
+                    .masterNodeTimeout(TimeValue.MAX_VALUE);
                 updateSettingsRequest.settings(updatedSettings);
                 CheckedConsumer<AcknowledgedResponse, Exception> onResponse = response -> openIndex(followIndex, handler, onFailure);
                 followerClient.admin().indices().updateSettings(updateSettingsRequest, ActionListener.wrap(onResponse, onFailure));
@@ -367,7 +369,7 @@ public class ShardFollowTasksExecutor extends PersistentTasksExecutor<ShardFollo
             private void openIndex(String followIndex,
                                    Runnable handler,
                                    Consumer<Exception> onFailure) {
-                OpenIndexRequest openIndexRequest = new OpenIndexRequest(followIndex);
+                OpenIndexRequest openIndexRequest = new OpenIndexRequest(followIndex).masterNodeTimeout(TimeValue.MAX_VALUE);
                 CheckedConsumer<OpenIndexResponse, Exception> onResponse = response -> handler.run();
                 followerClient.admin().indices().open(openIndexRequest, ActionListener.wrap(onResponse, onFailure));
             }

+ 1 - 2
x-pack/plugin/ccr/src/main/java/org/elasticsearch/xpack/ccr/repository/CcrRepository.java

@@ -454,8 +454,7 @@ public class CcrRepository extends AbstractLifecycleComponent implements Reposit
         final IndexMetadata leaderIndexMetadata = indexMetadataFuture.actionGet(ccrSettings.getRecoveryActionTimeout());
         final MappingMetadata mappingMetadata = leaderIndexMetadata.mapping();
         if (mappingMetadata != null) {
-            final PutMappingRequest putMappingRequest = CcrRequests.putMappingRequest(followerIndex.getName(), mappingMetadata)
-                .masterNodeTimeout(TimeValue.timeValueMinutes(30));
+            final PutMappingRequest putMappingRequest = CcrRequests.putMappingRequest(followerIndex.getName(), mappingMetadata);
             followerClient.admin().indices().putMapping(putMappingRequest).actionGet(ccrSettings.getRecoveryActionTimeout());
         }
     }

+ 47 - 1
x-pack/plugin/ccr/src/test/java/org/elasticsearch/xpack/CcrIntegTestCase.java

@@ -20,6 +20,7 @@ import org.elasticsearch.action.admin.indices.stats.ShardStats;
 import org.elasticsearch.action.search.SearchRequest;
 import org.elasticsearch.action.search.SearchResponse;
 import org.elasticsearch.action.support.ActiveShardCount;
+import org.elasticsearch.action.support.master.AcknowledgedRequest;
 import org.elasticsearch.analysis.common.CommonAnalysisPlugin;
 import org.elasticsearch.client.Client;
 import org.elasticsearch.client.Requests;
@@ -77,6 +78,7 @@ import org.elasticsearch.test.NodeConfigurationSource;
 import org.elasticsearch.test.TestCluster;
 import org.elasticsearch.test.transport.MockTransportService;
 import org.elasticsearch.transport.RemoteConnectionStrategy;
+import org.elasticsearch.transport.TransportRequest;
 import org.elasticsearch.transport.TransportService;
 import org.elasticsearch.transport.nio.MockNioTransportPlugin;
 import org.elasticsearch.xpack.ccr.CcrSettings;
@@ -84,11 +86,16 @@ import org.elasticsearch.xpack.ccr.LocalStateCcr;
 import org.elasticsearch.xpack.core.XPackSettings;
 import org.elasticsearch.xpack.core.ccr.AutoFollowMetadata;
 import org.elasticsearch.xpack.core.ccr.ShardFollowNodeTaskStatus;
+import org.elasticsearch.xpack.core.ccr.action.ActivateAutoFollowPatternAction;
 import org.elasticsearch.xpack.core.ccr.action.CcrStatsAction;
+import org.elasticsearch.xpack.core.ccr.action.DeleteAutoFollowPatternAction;
 import org.elasticsearch.xpack.core.ccr.action.FollowStatsAction;
+import org.elasticsearch.xpack.core.ccr.action.ForgetFollowerAction;
 import org.elasticsearch.xpack.core.ccr.action.PauseFollowAction;
+import org.elasticsearch.xpack.core.ccr.action.PutAutoFollowPatternAction;
 import org.elasticsearch.xpack.core.ccr.action.PutFollowAction;
 import org.elasticsearch.xpack.core.ccr.action.ResumeFollowAction;
+import org.elasticsearch.xpack.core.ccr.action.UnfollowAction;
 import org.junit.After;
 import org.junit.AfterClass;
 import org.junit.Before;
@@ -146,6 +153,7 @@ public abstract class CcrIntegTestCase extends ESTestCase {
         if (clusterGroup != null && reuseClusters()) {
             clusterGroup.leaderCluster.ensureAtMostNumDataNodes(numberOfNodesPerCluster());
             clusterGroup.followerCluster.ensureAtMostNumDataNodes(numberOfNodesPerCluster());
+            setupMasterNodeRequestsValidatorOnFollowerCluster();
             return;
         }
 
@@ -176,6 +184,43 @@ public abstract class CcrIntegTestCase extends ESTestCase {
             ClusterService clusterService = followerCluster.getInstance(ClusterService.class);
             assertNotNull(clusterService.state().metadata().custom(LicensesMetadata.TYPE));
         });
+        setupMasterNodeRequestsValidatorOnFollowerCluster();
+    }
+
+    protected void setupMasterNodeRequestsValidatorOnFollowerCluster() {
+        final InternalTestCluster followerCluster = clusterGroup.followerCluster;
+        for (String nodeName : followerCluster.getNodeNames()) {
+            MockTransportService transportService = (MockTransportService) followerCluster.getInstance(TransportService.class, nodeName);
+            transportService.addSendBehavior((connection, requestId, action, request, options) -> {
+                if (isCcrAdminRequest(request) == false && request instanceof AcknowledgedRequest<?>) {
+                    final TimeValue masterTimeout = ((AcknowledgedRequest<?>) request).masterNodeTimeout();
+                    if (masterTimeout == null || masterTimeout.nanos() != TimeValue.MAX_VALUE.nanos()) {
+                        throw new AssertionError("time out of a master request [" + request + "] on the follower is not set to unbounded");
+                    }
+                }
+                connection.sendRequest(requestId, action, request, options);
+            });
+        }
+    }
+
+    protected void removeMasterNodeRequestsValidatorOnFollowerCluster() {
+        final InternalTestCluster followerCluster = clusterGroup.followerCluster;
+        for (String nodeName : followerCluster.getNodeNames()) {
+            MockTransportService transportService =
+                (MockTransportService) getFollowerCluster().getInstance(TransportService.class, nodeName);
+            transportService.clearAllRules();
+        }
+    }
+
+    private static boolean isCcrAdminRequest(TransportRequest request) {
+        return request instanceof PutFollowAction.Request ||
+            request instanceof ResumeFollowAction.Request ||
+            request instanceof PauseFollowAction.Request ||
+            request instanceof UnfollowAction.Request ||
+            request instanceof ForgetFollowerAction.Request ||
+            request instanceof PutAutoFollowPatternAction.Request ||
+            request instanceof ActivateAutoFollowPatternAction.Request ||
+            request instanceof DeleteAutoFollowPatternAction.Request;
     }
 
     /**
@@ -183,7 +228,7 @@ public abstract class CcrIntegTestCase extends ESTestCase {
      * is not replicated and if tests kill nodes, we have to wait 60s by default...
      */
     protected void disableDelayedAllocation(String index) {
-        UpdateSettingsRequest updateSettingsRequest = new UpdateSettingsRequest(index);
+        UpdateSettingsRequest updateSettingsRequest = new UpdateSettingsRequest(index).masterNodeTimeout(TimeValue.MAX_VALUE);
         Settings.Builder settingsBuilder = Settings.builder();
         settingsBuilder.put(UnassignedInfo.INDEX_DELAYED_NODE_LEFT_TIMEOUT_SETTING.getKey(), 0);
         updateSettingsRequest.settings(settingsBuilder);
@@ -193,6 +238,7 @@ public abstract class CcrIntegTestCase extends ESTestCase {
     @After
     public void afterTest() throws Exception {
         ensureEmptyWriteBuffers();
+        removeMasterNodeRequestsValidatorOnFollowerCluster();
         String masterNode = clusterGroup.followerCluster.getMasterName();
         ClusterService clusterService = clusterGroup.followerCluster.getInstance(ClusterService.class, masterNode);
         removeCCRRelatedMetadataFromClusterState(clusterService);