瀏覽代碼

Replicate index settings to followers (#35089)

This commit uses the index settings version so that a follower can
replicate index settings changes as needed from the leader.

Co-authored-by: Martijn van Groningen <martijn.v.groningen@gmail.com>
Jason Tedor 7 年之前
父節點
當前提交
4f4fc3b8f8
共有 22 個文件被更改,包括 650 次插入67 次删除
  1. 5 0
      docs/reference/ccr/apis/follow/get-follow-stats.asciidoc
  2. 1 1
      server/src/main/java/org/elasticsearch/node/Node.java
  3. 4 1
      server/src/main/java/org/elasticsearch/plugins/PersistentTaskPlugin.java
  4. 4 1
      server/src/test/java/org/elasticsearch/persistent/TestPersistentTasksPlugin.java
  5. 6 2
      x-pack/plugin/ccr/src/main/java/org/elasticsearch/xpack/ccr/Ccr.java
  6. 56 12
      x-pack/plugin/ccr/src/main/java/org/elasticsearch/xpack/ccr/action/ShardChangesAction.java
  7. 50 5
      x-pack/plugin/ccr/src/main/java/org/elasticsearch/xpack/ccr/action/ShardFollowNodeTask.java
  8. 89 3
      x-pack/plugin/ccr/src/main/java/org/elasticsearch/xpack/ccr/action/ShardFollowTasksExecutor.java
  9. 2 1
      x-pack/plugin/ccr/src/main/java/org/elasticsearch/xpack/ccr/action/TransportResumeFollowAction.java
  10. 229 0
      x-pack/plugin/ccr/src/test/java/org/elasticsearch/xpack/ccr/IndexFollowingIT.java
  11. 2 0
      x-pack/plugin/ccr/src/test/java/org/elasticsearch/xpack/ccr/action/ShardChangesResponseTests.java
  12. 28 10
      x-pack/plugin/ccr/src/test/java/org/elasticsearch/xpack/ccr/action/ShardFollowNodeTaskRandomTests.java
  13. 2 0
      x-pack/plugin/ccr/src/test/java/org/elasticsearch/xpack/ccr/action/ShardFollowNodeTaskStatusTests.java
  14. 123 21
      x-pack/plugin/ccr/src/test/java/org/elasticsearch/xpack/ccr/action/ShardFollowNodeTaskTests.java
  15. 8 1
      x-pack/plugin/ccr/src/test/java/org/elasticsearch/xpack/ccr/action/ShardFollowTaskReplicationTests.java
  16. 1 0
      x-pack/plugin/ccr/src/test/java/org/elasticsearch/xpack/ccr/action/StatsResponsesTests.java
  17. 5 1
      x-pack/plugin/ccr/src/test/java/org/elasticsearch/xpack/monitoring/collector/ccr/FollowStatsMonitoringDocTests.java
  18. 19 3
      x-pack/plugin/core/src/main/java/org/elasticsearch/xpack/core/ccr/ShardFollowNodeTaskStatus.java
  19. 3 0
      x-pack/plugin/core/src/main/resources/monitoring-es.json
  20. 5 3
      x-pack/plugin/core/src/test/java/org/elasticsearch/xpack/core/LocalStateCompositeXPackPlugin.java
  21. 4 1
      x-pack/plugin/ml/src/main/java/org/elasticsearch/xpack/ml/MachineLearning.java
  22. 4 1
      x-pack/plugin/rollup/src/main/java/org/elasticsearch/xpack/rollup/Rollup.java

+ 5 - 0
docs/reference/ccr/apis/follow/get-follow-stats.asciidoc

@@ -111,6 +111,9 @@ The `shards` array consists of objects containing the following fields:
 `indices[].shards[].follower_mapping_version`::
   (long) the mapping version the follower is synced up to
 
+`indices[].shards[].follower_settings_version`::
+  (long) the index settings version the follower is synced up to
+
 `indices[].shards[].total_read_time_millis`::
   (long) the total time reads were outstanding, measured from the time a read
   was sent to the leader to the time a reply was returned to the follower
@@ -206,6 +209,7 @@ The API returns the following results:
           "outstanding_write_requests" : 2,
           "write_buffer_operation_count" : 64,
           "follower_mapping_version" : 4,
+          "follower_settings_version" : 2,
           "total_read_time_millis" : 32768,
           "total_read_remote_exec_time_millis" : 16384,
           "successful_read_requests" : 32,
@@ -234,6 +238,7 @@ The API returns the following results:
 // TESTRESPONSE[s/"outstanding_write_requests" : 2/"outstanding_write_requests" : $body.indices.0.shards.0.outstanding_write_requests/]
 // TESTRESPONSE[s/"write_buffer_operation_count" : 64/"write_buffer_operation_count" : $body.indices.0.shards.0.write_buffer_operation_count/]
 // TESTRESPONSE[s/"follower_mapping_version" : 4/"follower_mapping_version" : $body.indices.0.shards.0.follower_mapping_version/]
+// TESTRESPONSE[s/"follower_settings_version" : 2/"follower_settings_version" : $body.indices.0.shards.0.follower_settings_version/]
 // TESTRESPONSE[s/"total_read_time_millis" : 32768/"total_read_time_millis" : $body.indices.0.shards.0.total_read_time_millis/]
 // TESTRESPONSE[s/"total_read_remote_exec_time_millis" : 16384/"total_read_remote_exec_time_millis" : $body.indices.0.shards.0.total_read_remote_exec_time_millis/]
 // TESTRESPONSE[s/"successful_read_requests" : 32/"successful_read_requests" : $body.indices.0.shards.0.successful_read_requests/]

+ 1 - 1
server/src/main/java/org/elasticsearch/node/Node.java

@@ -493,7 +493,7 @@ public class Node implements Closeable {
 
             final List<PersistentTasksExecutor<?>> tasksExecutors = pluginsService
                 .filterPlugins(PersistentTaskPlugin.class).stream()
-                .map(p -> p.getPersistentTasksExecutor(clusterService, threadPool, client))
+                .map(p -> p.getPersistentTasksExecutor(clusterService, threadPool, client, settingsModule))
                 .flatMap(List::stream)
                 .collect(toList());
 

+ 4 - 1
server/src/main/java/org/elasticsearch/plugins/PersistentTaskPlugin.java

@@ -20,6 +20,7 @@ package org.elasticsearch.plugins;
 
 import org.elasticsearch.client.Client;
 import org.elasticsearch.cluster.service.ClusterService;
+import org.elasticsearch.common.settings.SettingsModule;
 import org.elasticsearch.persistent.PersistentTasksExecutor;
 import org.elasticsearch.threadpool.ThreadPool;
 
@@ -35,7 +36,9 @@ public interface PersistentTaskPlugin {
      * Returns additional persistent tasks executors added by this plugin.
      */
     default List<PersistentTasksExecutor<?>> getPersistentTasksExecutor(ClusterService clusterService,
-                                                                        ThreadPool threadPool, Client client) {
+                                                                        ThreadPool threadPool,
+                                                                        Client client,
+                                                                        SettingsModule settingsModule) {
         return Collections.emptyList();
     }
 

+ 4 - 1
server/src/test/java/org/elasticsearch/persistent/TestPersistentTasksPlugin.java

@@ -44,6 +44,7 @@ import org.elasticsearch.common.io.stream.NamedWriteableRegistry;
 import org.elasticsearch.common.io.stream.StreamInput;
 import org.elasticsearch.common.io.stream.StreamOutput;
 import org.elasticsearch.common.io.stream.Writeable;
+import org.elasticsearch.common.settings.SettingsModule;
 import org.elasticsearch.common.xcontent.ConstructingObjectParser;
 import org.elasticsearch.common.xcontent.NamedXContentRegistry;
 import org.elasticsearch.common.xcontent.XContentBuilder;
@@ -89,7 +90,9 @@ public class TestPersistentTasksPlugin extends Plugin implements ActionPlugin, P
 
     @Override
     public List<PersistentTasksExecutor<?>> getPersistentTasksExecutor(ClusterService clusterService,
-                                                                       ThreadPool threadPool, Client client) {
+                                                                       ThreadPool threadPool,
+                                                                       Client client,
+                                                                       SettingsModule settingsModule) {
         return Collections.singletonList(new TestPersistentTasksExecutor(clusterService));
     }
 

+ 6 - 2
x-pack/plugin/ccr/src/main/java/org/elasticsearch/xpack/ccr/Ccr.java

@@ -19,6 +19,7 @@ import org.elasticsearch.common.settings.IndexScopedSettings;
 import org.elasticsearch.common.settings.Setting;
 import org.elasticsearch.common.settings.Settings;
 import org.elasticsearch.common.settings.SettingsFilter;
+import org.elasticsearch.common.settings.SettingsModule;
 import org.elasticsearch.common.xcontent.NamedXContentRegistry;
 import org.elasticsearch.env.Environment;
 import org.elasticsearch.env.NodeEnvironment;
@@ -149,8 +150,11 @@ public class Ccr extends Plugin implements ActionPlugin, PersistentTaskPlugin, E
 
     @Override
     public List<PersistentTasksExecutor<?>> getPersistentTasksExecutor(ClusterService clusterService,
-                                                                       ThreadPool threadPool, Client client) {
-        return Collections.singletonList(new ShardFollowTasksExecutor(client, threadPool, clusterService));
+                                                                       ThreadPool threadPool,
+                                                                       Client client,
+                                                                       SettingsModule settingsModule) {
+        IndexScopedSettings indexScopedSettings = settingsModule.getIndexScopedSettings();
+        return Collections.singletonList(new ShardFollowTasksExecutor(client, threadPool, clusterService, indexScopedSettings));
     }
 
     public List<ActionHandler<? extends ActionRequest, ? extends ActionResponse>> getActions() {

+ 56 - 12
x-pack/plugin/ccr/src/main/java/org/elasticsearch/xpack/ccr/action/ShardChangesAction.java

@@ -16,6 +16,7 @@ import org.elasticsearch.action.support.ActionFilters;
 import org.elasticsearch.action.support.single.shard.SingleShardRequest;
 import org.elasticsearch.action.support.single.shard.TransportSingleShardAction;
 import org.elasticsearch.cluster.ClusterState;
+import org.elasticsearch.cluster.metadata.IndexMetaData;
 import org.elasticsearch.cluster.metadata.IndexNameExpressionResolver;
 import org.elasticsearch.cluster.routing.ShardsIterator;
 import org.elasticsearch.cluster.service.ClusterService;
@@ -205,6 +206,12 @@ public class ShardChangesAction extends Action<ShardChangesAction.Response> {
             return mappingVersion;
         }
 
+        private long settingsVersion;
+
+        public long getSettingsVersion() {
+            return settingsVersion;
+        }
+
         private long globalCheckpoint;
 
         public long getGlobalCheckpoint() {
@@ -240,6 +247,7 @@ public class ShardChangesAction extends Action<ShardChangesAction.Response> {
 
         Response(
             final long mappingVersion,
+            final long settingsVersion,
             final long globalCheckpoint,
             final long maxSeqNo,
             final long maxSeqNoOfUpdatesOrDeletes,
@@ -247,6 +255,7 @@ public class ShardChangesAction extends Action<ShardChangesAction.Response> {
             final long tookInMillis) {
 
             this.mappingVersion = mappingVersion;
+            this.settingsVersion = settingsVersion;
             this.globalCheckpoint = globalCheckpoint;
             this.maxSeqNo = maxSeqNo;
             this.maxSeqNoOfUpdatesOrDeletes = maxSeqNoOfUpdatesOrDeletes;
@@ -258,6 +267,7 @@ public class ShardChangesAction extends Action<ShardChangesAction.Response> {
         public void readFrom(final StreamInput in) throws IOException {
             super.readFrom(in);
             mappingVersion = in.readVLong();
+            settingsVersion = in.readVLong();
             globalCheckpoint = in.readZLong();
             maxSeqNo = in.readZLong();
             maxSeqNoOfUpdatesOrDeletes = in.readZLong();
@@ -269,6 +279,7 @@ public class ShardChangesAction extends Action<ShardChangesAction.Response> {
         public void writeTo(final StreamOutput out) throws IOException {
             super.writeTo(out);
             out.writeVLong(mappingVersion);
+            out.writeVLong(settingsVersion);
             out.writeZLong(globalCheckpoint);
             out.writeZLong(maxSeqNo);
             out.writeZLong(maxSeqNoOfUpdatesOrDeletes);
@@ -282,6 +293,7 @@ public class ShardChangesAction extends Action<ShardChangesAction.Response> {
             if (o == null || getClass() != o.getClass()) return false;
             final Response that = (Response) o;
             return mappingVersion == that.mappingVersion &&
+                    settingsVersion == that.settingsVersion &&
                     globalCheckpoint == that.globalCheckpoint &&
                     maxSeqNo == that.maxSeqNo &&
                     maxSeqNoOfUpdatesOrDeletes == that.maxSeqNoOfUpdatesOrDeletes &&
@@ -291,8 +303,14 @@ public class ShardChangesAction extends Action<ShardChangesAction.Response> {
 
         @Override
         public int hashCode() {
-            return Objects.hash(mappingVersion, globalCheckpoint, maxSeqNo, maxSeqNoOfUpdatesOrDeletes,
-                Arrays.hashCode(operations), tookInMillis);
+            return Objects.hash(
+                    mappingVersion,
+                    settingsVersion,
+                    globalCheckpoint,
+                    maxSeqNo,
+                    maxSeqNoOfUpdatesOrDeletes,
+                    Arrays.hashCode(operations),
+                    tookInMillis);
         }
     }
 
@@ -317,7 +335,9 @@ public class ShardChangesAction extends Action<ShardChangesAction.Response> {
             final IndexService indexService = indicesService.indexServiceSafe(request.getShard().getIndex());
             final IndexShard indexShard = indexService.getShard(request.getShard().id());
             final SeqNoStats seqNoStats = indexShard.seqNoStats();
-            final long mappingVersion = clusterService.state().metaData().index(shardId.getIndex()).getMappingVersion();
+            final IndexMetaData indexMetaData = clusterService.state().metaData().index(shardId.getIndex());
+            final long mappingVersion = indexMetaData.getMappingVersion();
+            final long settingsVersion = indexMetaData.getSettingsVersion();
 
             final Translog.Operation[] operations = getOperations(
                     indexShard,
@@ -328,7 +348,13 @@ public class ShardChangesAction extends Action<ShardChangesAction.Response> {
                     request.getMaxBatchSize());
             // must capture after after snapshotting operations to ensure this MUS is at least the highest MUS of any of these operations.
             final long maxSeqNoOfUpdatesOrDeletes = indexShard.getMaxSeqNoOfUpdatesOrDeletes();
-            return getResponse(mappingVersion, seqNoStats, maxSeqNoOfUpdatesOrDeletes, operations, request.relativeStartNanos);
+            return getResponse(
+                    mappingVersion,
+                    settingsVersion,
+                    seqNoStats,
+                    maxSeqNoOfUpdatesOrDeletes,
+                    operations,
+                    request.relativeStartNanos);
         }
 
         @Override
@@ -404,12 +430,19 @@ public class ShardChangesAction extends Action<ShardChangesAction.Response> {
                     e);
             if (e instanceof TimeoutException) {
                 try {
-                    final long mappingVersion =
-                            clusterService.state().metaData().index(shardId.getIndex()).getMappingVersion();
+                    final IndexMetaData indexMetaData = clusterService.state().metaData().index(shardId.getIndex());
+                    final long mappingVersion = indexMetaData.getMappingVersion();
+                    final long settingsVersion = indexMetaData.getSettingsVersion();
                     final SeqNoStats latestSeqNoStats = indexShard.seqNoStats();
                     final long maxSeqNoOfUpdatesOrDeletes = indexShard.getMaxSeqNoOfUpdatesOrDeletes();
-                    listener.onResponse(getResponse(mappingVersion, latestSeqNoStats, maxSeqNoOfUpdatesOrDeletes, EMPTY_OPERATIONS_ARRAY,
-                        request.relativeStartNanos));
+                    listener.onResponse(
+                            getResponse(
+                                    mappingVersion,
+                                    settingsVersion,
+                                    latestSeqNoStats,
+                                    maxSeqNoOfUpdatesOrDeletes,
+                                    EMPTY_OPERATIONS_ARRAY,
+                                    request.relativeStartNanos));
                 } catch (final Exception caught) {
                     caught.addSuppressed(e);
                     listener.onFailure(caught);
@@ -494,12 +527,23 @@ public class ShardChangesAction extends Action<ShardChangesAction.Response> {
         return operations.toArray(EMPTY_OPERATIONS_ARRAY);
     }
 
-    static Response getResponse(final long mappingVersion, final SeqNoStats seqNoStats,
-                                final long maxSeqNoOfUpdates, final Translog.Operation[] operations, long relativeStartNanos) {
+    static Response getResponse(
+            final long mappingVersion,
+            final long settingsVersion,
+            final SeqNoStats seqNoStats,
+            final long maxSeqNoOfUpdates,
+            final Translog.Operation[] operations,
+            long relativeStartNanos) {
         long tookInNanos = System.nanoTime() - relativeStartNanos;
         long tookInMillis = TimeUnit.NANOSECONDS.toMillis(tookInNanos);
-        return new Response(mappingVersion, seqNoStats.getGlobalCheckpoint(), seqNoStats.getMaxSeqNo(), maxSeqNoOfUpdates,
-            operations, tookInMillis);
+        return new Response(
+                mappingVersion,
+                settingsVersion,
+                seqNoStats.getGlobalCheckpoint(),
+                seqNoStats.getMaxSeqNo(),
+                maxSeqNoOfUpdates,
+                operations,
+                tookInMillis);
     }
 
 }

+ 50 - 5
x-pack/plugin/ccr/src/main/java/org/elasticsearch/xpack/ccr/action/ShardFollowNodeTask.java

@@ -20,8 +20,8 @@ import org.elasticsearch.common.Randomness;
 import org.elasticsearch.common.collect.Tuple;
 import org.elasticsearch.common.transport.NetworkExceptionHelper;
 import org.elasticsearch.common.unit.TimeValue;
-import org.elasticsearch.index.shard.IllegalIndexShardStateException;
 import org.elasticsearch.index.seqno.SequenceNumbers;
+import org.elasticsearch.index.shard.IllegalIndexShardStateException;
 import org.elasticsearch.index.shard.ShardId;
 import org.elasticsearch.index.shard.ShardNotFoundException;
 import org.elasticsearch.index.translog.Translog;
@@ -74,6 +74,7 @@ public abstract class ShardFollowNodeTask extends AllocatedPersistentTask {
     private int numOutstandingReads = 0;
     private int numOutstandingWrites = 0;
     private long currentMappingVersion = 0;
+    private long currentSettingsVersion = 0;
     private long totalReadRemoteExecTimeMillis = 0;
     private long totalReadTimeMillis = 0;
     private long successfulReadRequests = 0;
@@ -134,9 +135,19 @@ public abstract class ShardFollowNodeTask extends AllocatedPersistentTask {
             synchronized (ShardFollowNodeTask.this) {
                 currentMappingVersion = followerMappingVersion;
             }
-            LOGGER.info("{} Started to follow leader shard {}, followGlobalCheckPoint={}, followerMappingVersion={}",
-                params.getFollowShardId(), params.getLeaderShardId(), followerGlobalCheckpoint, followerMappingVersion);
-            coordinateReads();
+            updateSettings(followerSettingsVersion -> {
+                synchronized (ShardFollowNodeTask.this) {
+                    currentSettingsVersion = followerSettingsVersion;
+                }
+                LOGGER.info(
+                        "{} following leader shard {}, follower global checkpoint=[{}], mapping version=[{}], settings version=[{}]",
+                        params.getFollowShardId(),
+                        params.getLeaderShardId(),
+                        followerGlobalCheckpoint,
+                        followerMappingVersion,
+                        followerSettingsVersion);
+                coordinateReads();
+            });
         });
     }
 
@@ -269,7 +280,15 @@ public abstract class ShardFollowNodeTask extends AllocatedPersistentTask {
     }
 
     void handleReadResponse(long from, long maxRequiredSeqNo, ShardChangesAction.Response response) {
-        maybeUpdateMapping(response.getMappingVersion(), () -> innerHandleReadResponse(from, maxRequiredSeqNo, response));
+        // In order to process this read response (3), we need to check and potentially update the follow index's setting (1) and
+        // check and potentially update the follow index's mappings (2).
+
+        // 3) handle read response:
+        Runnable handleResponseTask = () -> innerHandleReadResponse(from, maxRequiredSeqNo, response);
+        // 2) update follow index mapping:
+        Runnable updateMappingsTask = () -> maybeUpdateMapping(response.getMappingVersion(), handleResponseTask);
+        // 1) update follow index settings:
+        maybeUpdateSettings(response.getSettingsVersion(), updateMappingsTask);
     }
 
     /** Called when some operations are fetched from the leading */
@@ -367,6 +386,21 @@ public abstract class ShardFollowNodeTask extends AllocatedPersistentTask {
         }
     }
 
+    private synchronized void maybeUpdateSettings(final Long minimumRequiredSettingsVersion, Runnable task) {
+        if (currentSettingsVersion >= minimumRequiredSettingsVersion) {
+            LOGGER.trace("{} settings version [{}] is higher or equal than minimum required mapping version [{}]",
+                    params.getFollowShardId(), currentSettingsVersion, minimumRequiredSettingsVersion);
+            task.run();
+        } else {
+            LOGGER.trace("{} updating settings, settings version [{}] is lower than minimum required settings version [{}]",
+                    params.getFollowShardId(), currentSettingsVersion, minimumRequiredSettingsVersion);
+            updateSettings(settingsVersion -> {
+                currentSettingsVersion = settingsVersion;
+                task.run();
+            });
+        }
+    }
+
     private void updateMapping(LongConsumer handler) {
         updateMapping(handler, new AtomicInteger(0));
     }
@@ -375,6 +409,14 @@ public abstract class ShardFollowNodeTask extends AllocatedPersistentTask {
         innerUpdateMapping(handler, e -> handleFailure(e, retryCounter, () -> updateMapping(handler, retryCounter)));
     }
 
+    private void updateSettings(final LongConsumer handler) {
+        updateSettings(handler, new AtomicInteger(0));
+    }
+
+    private void updateSettings(final LongConsumer handler, final AtomicInteger retryCounter) {
+        innerUpdateSettings(handler, e -> handleFailure(e, retryCounter, () -> updateSettings(handler, retryCounter)));
+    }
+
     private void handleFailure(Exception e, AtomicInteger retryCounter, Runnable task) {
         assert e != null;
         if (shouldRetry(e) && isStopped() == false) {
@@ -424,6 +466,8 @@ public abstract class ShardFollowNodeTask extends AllocatedPersistentTask {
     // These methods are protected for testing purposes:
     protected abstract void innerUpdateMapping(LongConsumer handler, Consumer<Exception> errorHandler);
 
+    protected abstract void innerUpdateSettings(LongConsumer handler, Consumer<Exception> errorHandler);
+
     protected abstract void innerSendBulkShardOperationsRequest(String followerHistoryUUID,
                                                                 List<Translog.Operation> operations,
                                                                 long leaderMaxSeqNoOfUpdatesOrDeletes,
@@ -470,6 +514,7 @@ public abstract class ShardFollowNodeTask extends AllocatedPersistentTask {
                 buffer.size(),
                 bufferSizeInBytes,
                 currentMappingVersion,
+                currentSettingsVersion,
                 totalReadTimeMillis,
                 totalReadRemoteExecTimeMillis,
                 successfulReadRequests,

+ 89 - 3
x-pack/plugin/ccr/src/main/java/org/elasticsearch/xpack/ccr/action/ShardFollowTasksExecutor.java

@@ -8,16 +8,25 @@ package org.elasticsearch.xpack.ccr.action;
 import org.apache.logging.log4j.message.ParameterizedMessage;
 import org.elasticsearch.action.ActionListener;
 import org.elasticsearch.action.admin.cluster.state.ClusterStateRequest;
+import org.elasticsearch.action.admin.cluster.state.ClusterStateResponse;
+import org.elasticsearch.action.admin.indices.close.CloseIndexRequest;
 import org.elasticsearch.action.admin.indices.mapping.put.PutMappingRequest;
+import org.elasticsearch.action.admin.indices.open.OpenIndexRequest;
+import org.elasticsearch.action.admin.indices.open.OpenIndexResponse;
+import org.elasticsearch.action.admin.indices.settings.put.UpdateSettingsRequest;
 import org.elasticsearch.action.admin.indices.stats.IndexStats;
 import org.elasticsearch.action.admin.indices.stats.IndicesStatsRequest;
 import org.elasticsearch.action.admin.indices.stats.ShardStats;
+import org.elasticsearch.action.support.master.AcknowledgedResponse;
 import org.elasticsearch.client.Client;
 import org.elasticsearch.cluster.ClusterState;
 import org.elasticsearch.cluster.metadata.IndexMetaData;
 import org.elasticsearch.cluster.metadata.MappingMetaData;
 import org.elasticsearch.cluster.routing.IndexRoutingTable;
 import org.elasticsearch.cluster.service.ClusterService;
+import org.elasticsearch.common.CheckedConsumer;
+import org.elasticsearch.common.settings.IndexScopedSettings;
+import org.elasticsearch.common.settings.Settings;
 import org.elasticsearch.common.unit.TimeValue;
 import org.elasticsearch.common.util.concurrent.EsRejectedExecutionException;
 import org.elasticsearch.common.xcontent.XContentType;
@@ -56,12 +65,17 @@ public class ShardFollowTasksExecutor extends PersistentTasksExecutor<ShardFollo
     private final Client client;
     private final ThreadPool threadPool;
     private final ClusterService clusterService;
+    private final IndexScopedSettings indexScopedSettings;
 
-    public ShardFollowTasksExecutor(Client client, ThreadPool threadPool, ClusterService clusterService) {
+    public ShardFollowTasksExecutor(Client client,
+                                    ThreadPool threadPool,
+                                    ClusterService clusterService,
+                                    IndexScopedSettings indexScopedSettings) {
         super(ShardFollowTask.NAME, Ccr.CCR_THREAD_POOL_NAME);
         this.client = client;
         this.threadPool = threadPool;
         this.clusterService = clusterService;
+        this.indexScopedSettings = indexScopedSettings;
     }
 
     @Override
@@ -131,6 +145,79 @@ public class ShardFollowTasksExecutor extends PersistentTasksExecutor<ShardFollo
                 }, errorHandler));
             }
 
+            @Override
+            protected void innerUpdateSettings(final LongConsumer finalHandler, final Consumer<Exception> errorHandler) {
+                final Index leaderIndex = params.getLeaderShardId().getIndex();
+                final Index followIndex = params.getFollowShardId().getIndex();
+
+                final ClusterStateRequest clusterStateRequest = new ClusterStateRequest();
+                clusterStateRequest.clear();
+                clusterStateRequest.metaData(true);
+                clusterStateRequest.indices(leaderIndex.getName());
+
+                CheckedConsumer<ClusterStateResponse, Exception> onResponse = clusterStateResponse -> {
+                    final IndexMetaData leaderIMD = clusterStateResponse.getState().metaData().getIndexSafe(leaderIndex);
+                    final IndexMetaData followerIMD = clusterService.state().metaData().getIndexSafe(followIndex);
+
+                    final Settings existingSettings = TransportResumeFollowAction.filter(followerIMD.getSettings());
+                    final Settings settings = TransportResumeFollowAction.filter(leaderIMD.getSettings());
+                    if (existingSettings.equals(settings)) {
+                        // If no settings have been changed then just propagate settings version to shard follow node task:
+                        finalHandler.accept(leaderIMD.getSettingsVersion());
+                    } else {
+                        // Figure out which settings have been updated:
+                        final Settings updatedSettings = settings.filter(
+                            s -> existingSettings.get(s) == null || existingSettings.get(s).equals(settings.get(s)) == false
+                        );
+
+                        // Figure out whether the updated settings are all dynamic settings and
+                        // if so just update the follower index's settings:
+                        if (updatedSettings.keySet().stream().allMatch(indexScopedSettings::isDynamicSetting)) {
+                            // If only dynamic settings have been updated then just update these settings in follower index:
+                            final UpdateSettingsRequest updateSettingsRequest = new UpdateSettingsRequest(followIndex.getName());
+                            updateSettingsRequest.settings(updatedSettings);
+                            followerClient.admin().indices().updateSettings(updateSettingsRequest,
+                                ActionListener.wrap(response -> finalHandler.accept(leaderIMD.getSettingsVersion()), errorHandler));
+                        } else {
+                            // If one or more setting are not dynamic then close follow index, update leader settings and
+                            // then open leader index:
+                            Runnable handler = () -> finalHandler.accept(leaderIMD.getSettingsVersion());
+                            closeIndexUpdateSettingsAndOpenIndex(followIndex.getName(), updatedSettings, handler, errorHandler);
+                        }
+                    }
+                };
+                leaderClient.admin().cluster().state(clusterStateRequest, ActionListener.wrap(onResponse, errorHandler));
+            }
+
+            private void closeIndexUpdateSettingsAndOpenIndex(String followIndex,
+                                                              Settings updatedSettings,
+                                                              Runnable handler,
+                                                              Consumer<Exception> onFailure) {
+                CloseIndexRequest closeRequest = new CloseIndexRequest(followIndex);
+                CheckedConsumer<AcknowledgedResponse, Exception> onResponse = response -> {
+                    updateSettingsAndOpenIndex(followIndex, updatedSettings, handler, onFailure);
+                };
+                followerClient.admin().indices().close(closeRequest, ActionListener.wrap(onResponse, onFailure));
+            }
+
+            private void updateSettingsAndOpenIndex(String followIndex,
+                                                    Settings updatedSettings,
+                                                    Runnable handler,
+                                                    Consumer<Exception> onFailure) {
+                final UpdateSettingsRequest updateSettingsRequest = new UpdateSettingsRequest(followIndex);
+                updateSettingsRequest.settings(updatedSettings);
+                CheckedConsumer<AcknowledgedResponse, Exception> onResponse = response -> openIndex(followIndex, handler, onFailure);
+                followerClient.admin().indices().updateSettings(updateSettingsRequest, ActionListener.wrap(onResponse, onFailure));
+            }
+
+            private void openIndex(String followIndex,
+                                   Runnable handler,
+                                   Consumer<Exception> onFailure) {
+                OpenIndexRequest openIndexRequest = new OpenIndexRequest(followIndex);
+                CheckedConsumer<OpenIndexResponse, Exception> onResponse = response -> handler.run();
+                followerClient.admin().indices().open(openIndexRequest, ActionListener.wrap(onResponse, onFailure));
+            }
+
             @Override
             protected void innerSendBulkShardOperationsRequest(
                 final String followerHistoryUUID,
@@ -141,8 +228,7 @@ public class ShardFollowTasksExecutor extends PersistentTasksExecutor<ShardFollo
 
                 final BulkShardOperationsRequest request = new BulkShardOperationsRequest(params.getFollowShardId(),
                     followerHistoryUUID, operations, maxSeqNoOfUpdatesOrDeletes);
-                followerClient.execute(BulkShardOperationsAction.INSTANCE, request,
-                    ActionListener.wrap(response -> handler.accept(response), errorHandler));
+                followerClient.execute(BulkShardOperationsAction.INSTANCE, request, ActionListener.wrap(handler::accept, errorHandler));
             }
 
             @Override

+ 2 - 1
x-pack/plugin/ccr/src/main/java/org/elasticsearch/xpack/ccr/action/TransportResumeFollowAction.java

@@ -385,10 +385,11 @@ public class TransportResumeFollowAction extends TransportMasterNodeAction<Resum
         WHITE_LISTED_SETTINGS = Collections.unmodifiableSet(whiteListedSettings);
     }
 
-    private static Settings filter(Settings originalSettings) {
+    static Settings filter(Settings originalSettings) {
         Settings.Builder settings = Settings.builder().put(originalSettings);
         // Remove settings that are always going to be different between leader and follow index:
         settings.remove(CcrSettings.CCR_FOLLOWING_INDEX_SETTING.getKey());
+        settings.remove(IndexMetaData.SETTING_INDEX_VERSION_CREATED.getKey());
         settings.remove(IndexMetaData.SETTING_INDEX_UUID);
         settings.remove(IndexMetaData.SETTING_INDEX_PROVIDED_NAME);
         settings.remove(IndexMetaData.SETTING_CREATION_DATE);

+ 229 - 0
x-pack/plugin/ccr/src/test/java/org/elasticsearch/xpack/ccr/IndexFollowingIT.java

@@ -14,7 +14,12 @@ import org.elasticsearch.action.admin.indices.close.CloseIndexRequest;
 import org.elasticsearch.action.admin.indices.delete.DeleteIndexRequest;
 import org.elasticsearch.action.admin.indices.exists.indices.IndicesExistsRequest;
 import org.elasticsearch.action.admin.indices.exists.indices.IndicesExistsResponse;
+import org.elasticsearch.action.admin.indices.mapping.get.GetMappingsRequest;
+import org.elasticsearch.action.admin.indices.mapping.get.GetMappingsResponse;
+import org.elasticsearch.action.admin.indices.mapping.put.PutMappingRequest;
 import org.elasticsearch.action.admin.indices.open.OpenIndexRequest;
+import org.elasticsearch.action.admin.indices.settings.get.GetSettingsRequest;
+import org.elasticsearch.action.admin.indices.settings.get.GetSettingsResponse;
 import org.elasticsearch.action.admin.indices.settings.put.UpdateSettingsRequest;
 import org.elasticsearch.action.admin.indices.stats.ShardStats;
 import org.elasticsearch.action.bulk.BulkProcessor;
@@ -27,6 +32,7 @@ import org.elasticsearch.action.support.WriteRequest;
 import org.elasticsearch.cluster.ClusterState;
 import org.elasticsearch.cluster.metadata.IndexMetaData;
 import org.elasticsearch.cluster.metadata.MappingMetaData;
+import org.elasticsearch.cluster.service.ClusterService;
 import org.elasticsearch.common.CheckedRunnable;
 import org.elasticsearch.common.bytes.BytesReference;
 import org.elasticsearch.common.settings.Settings;
@@ -64,6 +70,8 @@ import java.util.Objects;
 import java.util.concurrent.Semaphore;
 import java.util.concurrent.TimeUnit;
 import java.util.concurrent.atomic.AtomicBoolean;
+import java.util.function.BooleanSupplier;
+import java.util.stream.Collectors;
 
 import static java.util.Collections.singletonMap;
 import static org.elasticsearch.common.xcontent.XContentFactory.jsonBuilder;
@@ -585,6 +593,227 @@ public class IndexFollowingIT extends CcrIntegTestCase {
         }
     }
 
+    public void testUpdateDynamicLeaderIndexSettings() throws Exception {
+        final String leaderIndexSettings = getIndexSettings(1, 0,
+            singletonMap(IndexSettings.INDEX_SOFT_DELETES_SETTING.getKey(), "true"));
+        assertAcked(leaderClient().admin().indices().prepareCreate("leader").setSource(leaderIndexSettings, XContentType.JSON));
+        ensureLeaderYellow("leader");
+
+        final PutFollowAction.Request followRequest = putFollow("leader", "follower");
+        followerClient().execute(PutFollowAction.INSTANCE, followRequest).get();
+        BooleanSupplier hasFollowIndexBeenClosedChecker = hasFollowIndexBeenClosed("follower");
+
+        final long firstBatchNumDocs = randomIntBetween(2, 64);
+        for (long i = 0; i < firstBatchNumDocs; i++) {
+            leaderClient().prepareIndex("leader", "doc").setSource("{}", XContentType.JSON).get();
+        }
+        assertBusy(() -> assertThat(followerClient().prepareSearch("follower").get().getHits().totalHits, equalTo(firstBatchNumDocs)));
+
+        // Sanity check that the setting has not been set in follower index:
+        {
+            GetSettingsRequest getSettingsRequest = new GetSettingsRequest();
+            getSettingsRequest.indices("follower");
+            GetSettingsResponse getSettingsResponse = followerClient().admin().indices().getSettings(getSettingsRequest).actionGet();
+            assertThat(getSettingsResponse.getSetting("follower", "index.max_ngram_diff"), nullValue());
+        }
+        assertThat(getFollowTaskSettingsVersion("follower"), equalTo(1L));
+        UpdateSettingsRequest updateSettingsRequest = new UpdateSettingsRequest("leader");
+        updateSettingsRequest.settings(Settings.builder().put("index.max_ngram_diff", 2));
+        assertAcked(leaderClient().admin().indices().updateSettings(updateSettingsRequest).actionGet());
+
+        final int secondBatchNumDocs = randomIntBetween(2, 64);
+        for (long i = firstBatchNumDocs; i < firstBatchNumDocs + secondBatchNumDocs; i++) {
+            leaderClient().prepareIndex("leader", "doc").setSource("{}", XContentType.JSON).get();
+        }
+        assertBusy(() -> {
+            // Check that the setting has been set in follower index:
+            GetSettingsRequest getSettingsRequest = new GetSettingsRequest();
+            getSettingsRequest.indices("follower");
+            GetSettingsResponse getSettingsResponse = followerClient().admin().indices().getSettings(getSettingsRequest).actionGet();
+            assertThat(getSettingsResponse.getSetting("follower", "index.max_ngram_diff"), equalTo("2"));
+            assertThat(getFollowTaskSettingsVersion("follower"), equalTo(2L));
+
+            try {
+                assertThat(followerClient().prepareSearch("follower").get().getHits().totalHits,
+                    equalTo(firstBatchNumDocs + secondBatchNumDocs));
+            } catch (Exception e) {
+                throw new AssertionError("error while searching", e);
+            }
+        });
+        assertThat(hasFollowIndexBeenClosedChecker.getAsBoolean(), is(false));
+    }
+
+    public void testLeaderIndexSettingNotPercolatedToFollower() throws Exception {
+        // Sets an index setting on leader index that is excluded from being replicated to the follower index and
+        // expects that this setting is not replicated to the follower index, but does expect that the settings version
+        // is incremented.
+        final String leaderIndexSettings = getIndexSettings(1, 0,
+            singletonMap(IndexSettings.INDEX_SOFT_DELETES_SETTING.getKey(), "true"));
+        assertAcked(leaderClient().admin().indices().prepareCreate("leader").setSource(leaderIndexSettings, XContentType.JSON));
+        ensureLeaderYellow("leader");
+
+        final PutFollowAction.Request followRequest = putFollow("leader", "follower");
+        followerClient().execute(PutFollowAction.INSTANCE, followRequest).get();
+        BooleanSupplier hasFollowIndexBeenClosedChecker = hasFollowIndexBeenClosed("follower");
+
+        final long firstBatchNumDocs = randomIntBetween(2, 64);
+        for (long i = 0; i < firstBatchNumDocs; i++) {
+            leaderClient().prepareIndex("leader", "doc").setSource("{}", XContentType.JSON).get();
+        }
+        assertBusy(() -> assertThat(followerClient().prepareSearch("follower").get().getHits().totalHits, equalTo(firstBatchNumDocs)));
+
+        // Sanity check that the setting has not been set in follower index:
+        {
+            GetSettingsRequest getSettingsRequest = new GetSettingsRequest();
+            getSettingsRequest.indices("follower");
+            GetSettingsResponse getSettingsResponse = followerClient().admin().indices().getSettings(getSettingsRequest).actionGet();
+            assertThat(getSettingsResponse.getSetting("follower", "index.number_of_replicas"), equalTo("0"));
+        }
+        assertThat(getFollowTaskSettingsVersion("follower"), equalTo(1L));
+        UpdateSettingsRequest updateSettingsRequest = new UpdateSettingsRequest("leader");
+        updateSettingsRequest.settings(Settings.builder().put("index.number_of_replicas", 1));
+        assertAcked(leaderClient().admin().indices().updateSettings(updateSettingsRequest).actionGet());
+
+        final int secondBatchNumDocs = randomIntBetween(2, 64);
+        for (long i = firstBatchNumDocs; i < firstBatchNumDocs + secondBatchNumDocs; i++) {
+            leaderClient().prepareIndex("leader", "doc").setSource("{}", XContentType.JSON).get();
+        }
+        assertBusy(() -> {
+            GetSettingsRequest getSettingsRequest = new GetSettingsRequest();
+            getSettingsRequest.indices("follower");
+            GetSettingsResponse getSettingsResponse = followerClient().admin().indices().getSettings(getSettingsRequest).actionGet();
+            assertThat(getSettingsResponse.getSetting("follower", "index.number_of_replicas"), equalTo("0"));
+            assertThat(getFollowTaskSettingsVersion("follower"), equalTo(2L));
+
+            try {
+                assertThat(followerClient().prepareSearch("follower").get().getHits().totalHits,
+                    equalTo(firstBatchNumDocs + secondBatchNumDocs));
+            } catch (Exception e) {
+                throw new AssertionError("error while searching", e);
+            }
+        });
+        assertThat(hasFollowIndexBeenClosedChecker.getAsBoolean(), is(false));
+    }
+
+    public void testUpdateAnalysisLeaderIndexSettings() throws Exception {
+        final String leaderIndexSettings = getIndexSettings(1, 0,
+            singletonMap(IndexSettings.INDEX_SOFT_DELETES_SETTING.getKey(), "true"));
+        assertAcked(leaderClient().admin().indices().prepareCreate("leader").setSource(leaderIndexSettings, XContentType.JSON));
+        ensureLeaderYellow("leader");
+
+        final PutFollowAction.Request followRequest = putFollow("leader", "follower");
+        followerClient().execute(PutFollowAction.INSTANCE, followRequest).get();
+        BooleanSupplier hasFollowIndexBeenClosedChecker = hasFollowIndexBeenClosed("follower");
+
+        final long firstBatchNumDocs = randomIntBetween(2, 64);
+        for (long i = 0; i < firstBatchNumDocs; i++) {
+            leaderClient().prepareIndex("leader", "doc").setSource("{}", XContentType.JSON).get();
+        }
+
+        assertBusy(() -> assertThat(followerClient().prepareSearch("follower").get().getHits().totalHits, equalTo(firstBatchNumDocs)));
+        assertThat(getFollowTaskSettingsVersion("follower"), equalTo(1L));
+        assertThat(getFollowTaskMappingVersion("follower"), equalTo(1L));
+
+        CloseIndexRequest closeIndexRequest = new CloseIndexRequest("leader");
+        assertAcked(leaderClient().admin().indices().close(closeIndexRequest).actionGet());
+
+        UpdateSettingsRequest updateSettingsRequest = new UpdateSettingsRequest("leader");
+        updateSettingsRequest.settings(Settings.builder()
+            .put("index.analysis.analyzer.my_analyzer.type", "custom")
+            .put("index.analysis.analyzer.my_analyzer.tokenizer", "keyword")
+        );
+        assertAcked(leaderClient().admin().indices().updateSettings(updateSettingsRequest).actionGet());
+
+        OpenIndexRequest openIndexRequest = new OpenIndexRequest("leader");
+        assertAcked(leaderClient().admin().indices().open(openIndexRequest).actionGet());
+        ensureLeaderGreen("leader");
+
+        PutMappingRequest putMappingRequest = new PutMappingRequest("leader");
+        putMappingRequest.type("doc");
+        putMappingRequest.source("new_field", "type=text,analyzer=my_analyzer");
+        assertAcked(leaderClient().admin().indices().putMapping(putMappingRequest).actionGet());
+
+        final int secondBatchNumDocs = randomIntBetween(2, 64);
+        for (long i = firstBatchNumDocs; i < firstBatchNumDocs + secondBatchNumDocs; i++) {
+            final String source = String.format(Locale.ROOT, "{\"new_field\":\"value %d\"}", i);
+            leaderClient().prepareIndex("leader", "doc").setSource(source, XContentType.JSON).get();
+        }
+
+        assertBusy(() -> {
+            assertThat(getFollowTaskSettingsVersion("follower"), equalTo(2L));
+            assertThat(getFollowTaskMappingVersion("follower"), equalTo(2L));
+
+            GetSettingsRequest getSettingsRequest = new GetSettingsRequest();
+            getSettingsRequest.indices("follower");
+            GetSettingsResponse getSettingsResponse = followerClient().admin().indices().getSettings(getSettingsRequest).actionGet();
+            assertThat(getSettingsResponse.getSetting("follower", "index.analysis.analyzer.my_analyzer.type"), equalTo("custom"));
+            assertThat(getSettingsResponse.getSetting("follower", "index.analysis.analyzer.my_analyzer.tokenizer"), equalTo("keyword"));
+
+            GetMappingsRequest getMappingsRequest = new GetMappingsRequest();
+            getMappingsRequest.indices("follower");
+            GetMappingsResponse getMappingsResponse = followerClient().admin().indices().getMappings(getMappingsRequest).actionGet();
+            MappingMetaData mappingMetaData = getMappingsResponse.getMappings().get("follower").get("doc");
+            assertThat(XContentMapValues.extractValue("properties.new_field.type", mappingMetaData.sourceAsMap()), equalTo("text"));
+            assertThat(XContentMapValues.extractValue("properties.new_field.analyzer", mappingMetaData.sourceAsMap()),
+                equalTo("my_analyzer"));
+
+            try {
+                assertThat(followerClient().prepareSearch("follower").get().getHits().totalHits,
+                    equalTo(firstBatchNumDocs + secondBatchNumDocs));
+            } catch (Exception e) {
+                throw new AssertionError("error while searching", e);
+            }
+        });
+        assertThat(hasFollowIndexBeenClosedChecker.getAsBoolean(), is(true));
+    }
+
+    private long getFollowTaskSettingsVersion(String followerIndex) {
+        long settingsVersion = -1L;
+        for (ShardFollowNodeTaskStatus status : getFollowTaskStatuses(followerIndex)) {
+            if (settingsVersion == -1L) {
+                settingsVersion = status.followerSettingsVersion();
+            } else {
+                assert settingsVersion == status.followerSettingsVersion();
+            }
+        }
+        return settingsVersion;
+    }
+
+    private long getFollowTaskMappingVersion(String followerIndex) {
+        long mappingVersion = -1L;
+        for (ShardFollowNodeTaskStatus status : getFollowTaskStatuses(followerIndex)) {
+            if (mappingVersion == -1L) {
+                mappingVersion = status.followerMappingVersion();
+            } else {
+                assert mappingVersion == status.followerMappingVersion();
+            }
+        }
+        return mappingVersion;
+    }
+
+    private List<ShardFollowNodeTaskStatus> getFollowTaskStatuses(String followerIndex) {
+        FollowStatsAction.StatsRequest request = new StatsRequest();
+        request.setIndices(new String[]{followerIndex});
+        FollowStatsAction.StatsResponses response = followerClient().execute(FollowStatsAction.INSTANCE, request).actionGet();
+        return response.getStatsResponses().stream()
+            .map(FollowStatsAction.StatsResponse::status)
+            .filter(status -> status.followerIndex().equals(followerIndex))
+            .collect(Collectors.toList());
+    }
+
+    private BooleanSupplier hasFollowIndexBeenClosed(String indexName) {
+        String electedMasterNode = getFollowerCluster().getMasterName();
+        ClusterService clusterService = getFollowerCluster().getInstance(ClusterService.class, electedMasterNode);
+        AtomicBoolean closed = new AtomicBoolean(false);
+        clusterService.addListener(event -> {
+            IndexMetaData indexMetaData = event.state().metaData().index(indexName);
+            if (indexMetaData.getState() == IndexMetaData.State.CLOSE) {
+                closed.set(true);
+            }
+        });
+        return closed::get;
+    }
+
     private CheckedRunnable<Exception> assertTask(final int numberOfPrimaryShards, final Map<ShardId, Long> numDocsPerShard) {
         return () -> {
             final ClusterState clusterState = followerClient().admin().cluster().prepareState().get().getState();

+ 2 - 0
x-pack/plugin/ccr/src/test/java/org/elasticsearch/xpack/ccr/action/ShardChangesResponseTests.java

@@ -13,6 +13,7 @@ public class ShardChangesResponseTests extends AbstractStreamableTestCase<ShardC
     @Override
     protected ShardChangesAction.Response createTestInstance() {
         final long mappingVersion = randomNonNegativeLong();
+        final long settingsVersion = randomNonNegativeLong();
         final long leaderGlobalCheckpoint = randomNonNegativeLong();
         final long leaderMaxSeqNo = randomLongBetween(leaderGlobalCheckpoint, Long.MAX_VALUE);
         final long maxSeqNoOfUpdatesOrDeletes = randomLongBetween(-1, Long.MAX_VALUE);
@@ -23,6 +24,7 @@ public class ShardChangesResponseTests extends AbstractStreamableTestCase<ShardC
         }
         return new ShardChangesAction.Response(
             mappingVersion,
+            settingsVersion,
             leaderGlobalCheckpoint,
             leaderMaxSeqNo,
             maxSeqNoOfUpdatesOrDeletes,

+ 28 - 10
x-pack/plugin/ccr/src/test/java/org/elasticsearch/xpack/ccr/action/ShardFollowNodeTaskRandomTests.java

@@ -40,14 +40,15 @@ import static org.hamcrest.Matchers.hasSize;
 public class ShardFollowNodeTaskRandomTests extends ESTestCase {
 
     public void testSingleReaderWriter() throws Exception {
-        TestRun testRun = createTestRun(randomNonNegativeLong(), randomNonNegativeLong(), randomIntBetween(1, 2048));
+        TestRun testRun = createTestRun(randomNonNegativeLong(), randomNonNegativeLong(), randomNonNegativeLong(),
+            randomIntBetween(1, 2048));
         ShardFollowNodeTask task = createShardFollowTask(1, testRun);
         startAndAssertAndStopTask(task, testRun);
     }
 
     public void testMultipleReaderWriter() throws Exception {
         int concurrency = randomIntBetween(2, 8);
-        TestRun testRun = createTestRun(0, 0, between(1, 1024));
+        TestRun testRun = createTestRun(0, 0, 0, between(1, 1024));
         ShardFollowNodeTask task = createShardFollowTask(concurrency, testRun);
         startAndAssertAndStopTask(task, testRun);
     }
@@ -106,6 +107,7 @@ public class ShardFollowNodeTaskRandomTests extends ESTestCase {
                 1L, "type", ShardFollowTask.NAME, "description", null, Collections.emptyMap(), params, scheduler, System::nanoTime) {
 
             private volatile long mappingVersion = 0L;
+            private volatile long settingsVersion = 0L;
             private final Map<Long, Integer> fromToSlot = new HashMap<>();
 
             @Override
@@ -113,6 +115,11 @@ public class ShardFollowNodeTaskRandomTests extends ESTestCase {
                 handler.accept(mappingVersion);
             }
 
+            @Override
+            protected void innerUpdateSettings(LongConsumer handler, Consumer<Exception> errorHandler) {
+                handler.accept(settingsVersion);
+            }
+
             @Override
             protected void innerSendBulkShardOperationsRequest(
                 String followerHistoryUUID, List<Translog.Operation> operations,
@@ -153,6 +160,7 @@ public class ShardFollowNodeTaskRandomTests extends ESTestCase {
                             // if too many invocations occur with the same from then AOBE occurs, this ok and then something is wrong.
                         }
                         mappingVersion = testResponse.mappingVersion;
+                        settingsVersion = testResponse.settingsVersion;
                         if (testResponse.exception != null) {
                             errorHandler.accept(testResponse.exception);
                         } else {
@@ -162,8 +170,8 @@ public class ShardFollowNodeTaskRandomTests extends ESTestCase {
                         assert from >= testRun.finalExpectedGlobalCheckpoint;
                         final long globalCheckpoint = tracker.getCheckpoint();
                         final long maxSeqNo = tracker.getMaxSeqNo();
-                        handler.accept(new ShardChangesAction.Response(
-                            0L, globalCheckpoint, maxSeqNo, randomNonNegativeLong(), new Translog.Operation[0], 1L));
+                        handler.accept(new ShardChangesAction.Response(0L, 0L, globalCheckpoint, maxSeqNo, randomNonNegativeLong(),
+                            new Translog.Operation[0], 1L));
                     }
                 };
                 threadPool.generic().execute(task);
@@ -206,9 +214,10 @@ public class ShardFollowNodeTaskRandomTests extends ESTestCase {
         };
     }
 
-    private static TestRun createTestRun(long startSeqNo, long startMappingVersion, int maxOperationCount) {
+    private static TestRun createTestRun(long startSeqNo, long startMappingVersion, long startSettingsVersion, int maxOperationCount) {
         long prevGlobalCheckpoint = startSeqNo;
         long mappingVersion = startMappingVersion;
+        long settingsVersion = startSettingsVersion;
         int numResponses = randomIntBetween(16, 256);
         Map<Long, List<TestResponse>> responses = new HashMap<>(numResponses);
         for (int i = 0; i < numResponses; i++) {
@@ -216,13 +225,16 @@ public class ShardFollowNodeTaskRandomTests extends ESTestCase {
             if (sometimes()) {
                 mappingVersion++;
             }
+            if (sometimes()) {
+                settingsVersion++;
+            }
 
             if (sometimes()) {
                 List<TestResponse> item = new ArrayList<>();
                 // Sometimes add a random retryable error
                 if (sometimes()) {
                     Exception error = new UnavailableShardsException(new ShardId("test", "test", 0), "");
-                    item.add(new TestResponse(error, mappingVersion, null));
+                    item.add(new TestResponse(error, mappingVersion, settingsVersion, null));
                 }
                 List<Translog.Operation> ops = new ArrayList<>();
                 for (long seqNo = prevGlobalCheckpoint; seqNo <= nextGlobalCheckPoint; seqNo++) {
@@ -233,8 +245,10 @@ public class ShardFollowNodeTaskRandomTests extends ESTestCase {
                 item.add(new TestResponse(
                     null,
                     mappingVersion,
+                    settingsVersion,
                     new ShardChangesAction.Response(
                         mappingVersion,
+                        settingsVersion,
                         nextGlobalCheckPoint,
                         nextGlobalCheckPoint,
                         randomNonNegativeLong(),
@@ -253,19 +267,20 @@ public class ShardFollowNodeTaskRandomTests extends ESTestCase {
                     // Sometimes add a random retryable error
                     if (sometimes()) {
                         Exception error = new UnavailableShardsException(new ShardId("test", "test", 0), "");
-                        item.add(new TestResponse(error, mappingVersion, null));
+                        item.add(new TestResponse(error, mappingVersion, settingsVersion, null));
                     }
                     // Sometimes add an empty shard changes response to also simulate a leader shard lagging behind
                     if (sometimes()) {
                         ShardChangesAction.Response response = new ShardChangesAction.Response(
                             mappingVersion,
+                            settingsVersion,
                             prevGlobalCheckpoint,
                             prevGlobalCheckpoint,
                             randomNonNegativeLong(),
                             EMPTY,
                             randomNonNegativeLong()
                         );
-                        item.add(new TestResponse(null, mappingVersion, response));
+                        item.add(new TestResponse(null, mappingVersion, settingsVersion, response));
                     }
                     List<Translog.Operation> ops = new ArrayList<>();
                     for (long seqNo = fromSeqNo; seqNo <= toSeqNo; seqNo++) {
@@ -277,13 +292,14 @@ public class ShardFollowNodeTaskRandomTests extends ESTestCase {
                     long localLeaderGCP = randomBoolean() ? ops.get(ops.size() - 1).seqNo() : toSeqNo;
                     ShardChangesAction.Response response = new ShardChangesAction.Response(
                         mappingVersion,
+                        settingsVersion,
                         localLeaderGCP,
                         localLeaderGCP,
                         randomNonNegativeLong(),
                         ops.toArray(EMPTY),
                         randomNonNegativeLong()
                     );
-                    item.add(new TestResponse(null, mappingVersion, response));
+                    item.add(new TestResponse(null, mappingVersion, settingsVersion, response));
                     responses.put(fromSeqNo, Collections.unmodifiableList(item));
                 }
             }
@@ -323,11 +339,13 @@ public class ShardFollowNodeTaskRandomTests extends ESTestCase {
 
         final Exception exception;
         final long mappingVersion;
+        final long settingsVersion;
         final ShardChangesAction.Response response;
 
-        private TestResponse(Exception exception, long mappingVersion, ShardChangesAction.Response response) {
+        private TestResponse(Exception exception, long mappingVersion, long settingsVersion, ShardChangesAction.Response response) {
             this.exception = exception;
             this.mappingVersion = mappingVersion;
+            this.settingsVersion = settingsVersion;
             this.response = response;
         }
     }

+ 2 - 0
x-pack/plugin/ccr/src/test/java/org/elasticsearch/xpack/ccr/action/ShardFollowNodeTaskStatusTests.java

@@ -58,6 +58,7 @@ public class ShardFollowNodeTaskStatusTests extends AbstractSerializingTestCase<
                 randomNonNegativeLong(),
                 randomNonNegativeLong(),
                 randomNonNegativeLong(),
+                randomNonNegativeLong(),
                 randomReadExceptions(),
                 randomLong(),
                 randomBoolean() ? new ElasticsearchException("fatal error") : null);
@@ -78,6 +79,7 @@ public class ShardFollowNodeTaskStatusTests extends AbstractSerializingTestCase<
         assertThat(newInstance.outstandingWriteRequests(), equalTo(expectedInstance.outstandingWriteRequests()));
         assertThat(newInstance.writeBufferOperationCount(), equalTo(expectedInstance.writeBufferOperationCount()));
         assertThat(newInstance.followerMappingVersion(), equalTo(expectedInstance.followerMappingVersion()));
+        assertThat(newInstance.followerSettingsVersion(), equalTo(expectedInstance.followerSettingsVersion()));
         assertThat(newInstance.totalReadTimeMillis(), equalTo(expectedInstance.totalReadTimeMillis()));
         assertThat(newInstance.successfulReadRequests(), equalTo(expectedInstance.successfulReadRequests()));
         assertThat(newInstance.failedReadRequests(), equalTo(expectedInstance.failedReadRequests()));

+ 123 - 21
x-pack/plugin/ccr/src/test/java/org/elasticsearch/xpack/ccr/action/ShardFollowNodeTaskTests.java

@@ -57,6 +57,8 @@ public class ShardFollowNodeTaskTests extends ESTestCase {
     private Queue<Exception> writeFailures;
     private Queue<Exception> mappingUpdateFailures;
     private Queue<Long> mappingVersions;
+    private Queue<Exception> settingsUpdateFailures;
+    private Queue<Long> settingsVersions;
     private Queue<Long> leaderGlobalCheckpoints;
     private Queue<Long> followerGlobalCheckpoints;
     private Queue<Long> maxSeqNos;
@@ -73,7 +75,7 @@ public class ShardFollowNodeTaskTests extends ESTestCase {
         task.coordinateReads();
         assertThat(shardChangesRequests, contains(new long[]{0L, 8L})); // treat this a peak request
         shardChangesRequests.clear();
-        task.innerHandleReadResponse(0, 5L, generateShardChangesResponse(0, 5L, 0L, 60L));
+        task.innerHandleReadResponse(0, 5L, generateShardChangesResponse(0, 5L, 0L, 0L, 60L));
         assertThat(shardChangesRequests, contains(new long[][]{
             {6L, 8L}, {14L, 8L}, {22L, 8L}, {30L, 8L}, {38L, 8L}, {46L, 8L}, {54L, 7L}}
         ));
@@ -98,7 +100,7 @@ public class ShardFollowNodeTaskTests extends ESTestCase {
 
         shardChangesRequests.clear();
         // Also invokes the coordinatesReads() method:
-        task.innerHandleReadResponse(0L, 63L, generateShardChangesResponse(0, 63, 0L, 128L));
+        task.innerHandleReadResponse(0L, 63L, generateShardChangesResponse(0, 63, 0L, 0L, 128L));
         assertThat(shardChangesRequests.size(), equalTo(0)); // no more reads, because write buffer count limit has been reached
 
         ShardFollowNodeTaskStatus status = task.getStatus();
@@ -124,7 +126,7 @@ public class ShardFollowNodeTaskTests extends ESTestCase {
 
         shardChangesRequests.clear();
         // Also invokes the coordinatesReads() method:
-        task.innerHandleReadResponse(0L, 63L, generateShardChangesResponse(0, 63, 0L, 128L));
+        task.innerHandleReadResponse(0L, 63L, generateShardChangesResponse(0, 63, 0L, 0L, 128L));
         assertThat(shardChangesRequests.size(), equalTo(0)); // no more reads, because write buffer size limit has been reached
 
         ShardFollowNodeTaskStatus status = task.getStatus();
@@ -189,7 +191,7 @@ public class ShardFollowNodeTaskTests extends ESTestCase {
         task.markAsCompleted();
         shardChangesRequests.clear();
         // Also invokes the coordinatesReads() method:
-        task.innerHandleReadResponse(0L, 15L, generateShardChangesResponse(0, 15, 0L, 31L));
+        task.innerHandleReadResponse(0L, 15L, generateShardChangesResponse(0, 15, 0L, 0L, 31L));
         assertThat(shardChangesRequests.size(), equalTo(0)); // no more reads, because task has been cancelled
         assertThat(bulkShardOperationRequests.size(), equalTo(0)); // no more writes, because task has been cancelled
 
@@ -219,7 +221,7 @@ public class ShardFollowNodeTaskTests extends ESTestCase {
         task.markAsCompleted();
         shardChangesRequests.clear();
         // Also invokes the coordinatesReads() method:
-        task.innerHandleReadResponse(0L, 63L, generateShardChangesResponse(0, 63, 0L, 128L));
+        task.innerHandleReadResponse(0L, 63L, generateShardChangesResponse(0, 63, 0L, 0L, 128L));
         assertThat(shardChangesRequests.size(), equalTo(0)); // no more reads, because task has been cancelled
         assertThat(bulkShardOperationRequests.size(), equalTo(0)); // no more writes, because task has been cancelled
 
@@ -439,7 +441,7 @@ public class ShardFollowNodeTaskTests extends ESTestCase {
         startTask(task, 63, -1);
 
         task.coordinateReads();
-        ShardChangesAction.Response response = generateShardChangesResponse(0, 63, 0L, 63L);
+        ShardChangesAction.Response response = generateShardChangesResponse(0, 63, 0L, 0L, 63L);
         task.innerHandleReadResponse(0L, 63L, response);
 
         assertThat(bulkShardOperationRequests.size(), equalTo(1));
@@ -469,7 +471,7 @@ public class ShardFollowNodeTaskTests extends ESTestCase {
         assertThat(shardChangesRequests.get(0)[1], equalTo(64L));
 
         shardChangesRequests.clear();
-        ShardChangesAction.Response response = generateShardChangesResponse(0, 20, 0L, 31L);
+        ShardChangesAction.Response response = generateShardChangesResponse(0, 20, 0L, 0L, 31L);
         task.innerHandleReadResponse(0L, 63L, response);
 
         assertThat(shardChangesRequests.size(), equalTo(1));
@@ -498,7 +500,7 @@ public class ShardFollowNodeTaskTests extends ESTestCase {
 
         shardChangesRequests.clear();
         task.markAsCompleted();
-        ShardChangesAction.Response response = generateShardChangesResponse(0, 31, 0L, 31L);
+        ShardChangesAction.Response response = generateShardChangesResponse(0, 31, 0L, 0L, 31L);
         task.innerHandleReadResponse(0L, 64L, response);
 
         assertThat(shardChangesRequests.size(), equalTo(0));
@@ -524,7 +526,7 @@ public class ShardFollowNodeTaskTests extends ESTestCase {
         assertThat(shardChangesRequests.get(0)[1], equalTo(64L));
 
         shardChangesRequests.clear();
-        task.innerHandleReadResponse(0L, 63L, new ShardChangesAction.Response(0, 0, 0, 100, new Translog.Operation[0], 1L));
+        task.innerHandleReadResponse(0L, 63L, new ShardChangesAction.Response(0, 0, 0, 0, 100, new Translog.Operation[0], 1L));
 
         assertThat(shardChangesRequests.size(), equalTo(1));
         assertThat(shardChangesRequests.get(0)[0], equalTo(0L));
@@ -547,7 +549,7 @@ public class ShardFollowNodeTaskTests extends ESTestCase {
 
         mappingVersions.add(1L);
         task.coordinateReads();
-        ShardChangesAction.Response response = generateShardChangesResponse(0, 63, 1L, 63L);
+        ShardChangesAction.Response response = generateShardChangesResponse(0, 63, 1L, 0L, 63L);
         task.handleReadResponse(0L, 63L, response);
 
         assertThat(bulkShardOperationRequests.size(), equalTo(1));
@@ -576,7 +578,7 @@ public class ShardFollowNodeTaskTests extends ESTestCase {
         }
         mappingVersions.add(1L);
         task.coordinateReads();
-        ShardChangesAction.Response response = generateShardChangesResponse(0, 63, 1L, 63L);
+        ShardChangesAction.Response response = generateShardChangesResponse(0, 63, 1L, 0L, 63L);
         task.handleReadResponse(0L, 63L, response);
 
         assertThat(mappingUpdateFailures.size(), equalTo(0));
@@ -601,7 +603,7 @@ public class ShardFollowNodeTaskTests extends ESTestCase {
 
         mappingUpdateFailures.add(new RuntimeException());
         task.coordinateReads();
-        ShardChangesAction.Response response = generateShardChangesResponse(0, 64, 1L, 64L);
+        ShardChangesAction.Response response = generateShardChangesResponse(0, 64, 1L, 0L, 64L);
         task.handleReadResponse(0L, 64L, response);
 
         assertThat(bulkShardOperationRequests.size(), equalTo(0));
@@ -614,6 +616,85 @@ public class ShardFollowNodeTaskTests extends ESTestCase {
         assertThat(status.leaderGlobalCheckpoint(), equalTo(63L));
     }
 
+    public void testSettingsUpdate() {
+        ShardFollowTaskParams params = new ShardFollowTaskParams();
+        params.maxReadRequestOperationCount = 64;
+        params.maxOutstandingReadRequests = 1;
+        params.maxOutstandingWriteRequests = 1;
+        ShardFollowNodeTask task = createShardFollowTask(params);
+        startTask(task, 63, -1);
+
+        settingsVersions.add(1L);
+        task.coordinateReads();
+        ShardChangesAction.Response response = generateShardChangesResponse(0, 63, 0L, 1L, 63L);
+        task.handleReadResponse(0L, 63L, response);
+
+        assertThat(bulkShardOperationRequests.size(), equalTo(1));
+        assertThat(bulkShardOperationRequests.get(0), equalTo(Arrays.asList(response.getOperations())));
+
+        ShardFollowNodeTaskStatus status = task.getStatus();
+        assertThat(status.followerMappingVersion(), equalTo(0L));
+        assertThat(status.followerSettingsVersion(), equalTo(1L));
+        assertThat(status.outstandingReadRequests(), equalTo(1));
+        assertThat(status.outstandingWriteRequests(), equalTo(1));
+        assertThat(status.lastRequestedSeqNo(), equalTo(63L));
+        assertThat(status.leaderGlobalCheckpoint(), equalTo(63L));
+        assertThat(status.followerGlobalCheckpoint(), equalTo(-1L));
+    }
+
+    public void testSettingsUpdateRetryableError() {
+        ShardFollowTaskParams params = new ShardFollowTaskParams();
+        params.maxReadRequestOperationCount = 64;
+        params.maxOutstandingReadRequests = 1;
+        params.maxOutstandingWriteRequests = 1;
+        ShardFollowNodeTask task = createShardFollowTask(params);
+        startTask(task, 63, -1);
+
+        int max = randomIntBetween(1, 30);
+        for (int i = 0; i < max; i++) {
+            settingsUpdateFailures.add(new ConnectException());
+        }
+        settingsVersions.add(1L);
+        task.coordinateReads();
+        ShardChangesAction.Response response = generateShardChangesResponse(0, 63, 0L, 1L, 63L);
+        task.handleReadResponse(0L, 63L, response);
+
+        assertThat(mappingUpdateFailures.size(), equalTo(0));
+        assertThat(bulkShardOperationRequests.size(), equalTo(1));
+        assertThat(task.isStopped(), equalTo(false));
+        ShardFollowNodeTaskStatus status = task.getStatus();
+        assertThat(status.followerMappingVersion(), equalTo(0L));
+        assertThat(status.followerSettingsVersion(), equalTo(1L));
+        assertThat(status.outstandingReadRequests(), equalTo(1));
+        assertThat(status.outstandingWriteRequests(), equalTo(1));
+        assertThat(status.lastRequestedSeqNo(), equalTo(63L));
+        assertThat(status.leaderGlobalCheckpoint(), equalTo(63L));
+    }
+
+    public void testSettingsUpdateNonRetryableError() {
+        ShardFollowTaskParams params = new ShardFollowTaskParams();
+        params.maxReadRequestOperationCount = 64;
+        params.maxOutstandingReadRequests = 1;
+        params.maxOutstandingWriteRequests = 1;
+        ShardFollowNodeTask task = createShardFollowTask(params);
+        startTask(task, 63, -1);
+
+        settingsUpdateFailures.add(new RuntimeException());
+        task.coordinateReads();
+        ShardChangesAction.Response response = generateShardChangesResponse(0, 64, 0L, 1L, 64L);
+        task.handleReadResponse(0L, 64L, response);
+
+        assertThat(bulkShardOperationRequests.size(), equalTo(0));
+        assertThat(task.isStopped(), equalTo(true));
+        ShardFollowNodeTaskStatus status = task.getStatus();
+        assertThat(status.followerMappingVersion(), equalTo(0L));
+        assertThat(status.followerSettingsVersion(), equalTo(0L));
+        assertThat(status.outstandingReadRequests(), equalTo(1));
+        assertThat(status.outstandingWriteRequests(), equalTo(0));
+        assertThat(status.lastRequestedSeqNo(), equalTo(63L));
+        assertThat(status.leaderGlobalCheckpoint(), equalTo(63L));
+    }
+
     public void testCoordinateWrites() {
         ShardFollowTaskParams params = new ShardFollowTaskParams();
         params.maxReadRequestOperationCount = 128;
@@ -629,7 +710,7 @@ public class ShardFollowNodeTaskTests extends ESTestCase {
         assertThat(shardChangesRequests.get(0)[0], equalTo(0L));
         assertThat(shardChangesRequests.get(0)[1], equalTo(128L));
 
-        ShardChangesAction.Response response = generateShardChangesResponse(0, 63, 0L, 63L);
+        ShardChangesAction.Response response = generateShardChangesResponse(0, 63, 0L, 0L, 63L);
         // Also invokes coordinatesWrites()
         task.innerHandleReadResponse(0L, 63L, response);
 
@@ -649,7 +730,7 @@ public class ShardFollowNodeTaskTests extends ESTestCase {
         params.maxWriteRequestOperationCount = 64;
         params.maxOutstandingWriteRequests = 2;
         ShardFollowNodeTask task = createShardFollowTask(params);
-        ShardChangesAction.Response response = generateShardChangesResponse(0, 256, 0L, 256L);
+        ShardChangesAction.Response response = generateShardChangesResponse(0, 256, 0L, 0L, 256L);
         // Also invokes coordinatesWrites()
         task.innerHandleReadResponse(0L, 64L, response);
 
@@ -662,7 +743,7 @@ public class ShardFollowNodeTaskTests extends ESTestCase {
 
         params.maxOutstandingWriteRequests = 4; // change to 4 outstanding writers
         task = createShardFollowTask(params);
-        response = generateShardChangesResponse(0, 256, 0L, 256L);
+        response = generateShardChangesResponse(0, 256, 0L, 0L, 256L);
         // Also invokes coordinatesWrites()
         task.innerHandleReadResponse(0L, 64L, response);
 
@@ -681,7 +762,7 @@ public class ShardFollowNodeTaskTests extends ESTestCase {
         params.maxWriteRequestOperationCount = 8;
         params.maxOutstandingWriteRequests = 32;
         ShardFollowNodeTask task = createShardFollowTask(params);
-        ShardChangesAction.Response response = generateShardChangesResponse(0, 256, 0L, 256L);
+        ShardChangesAction.Response response = generateShardChangesResponse(0, 256, 0L, 0L, 256L);
         // Also invokes coordinatesWrites()
         task.innerHandleReadResponse(0L, 64L, response);
 
@@ -712,7 +793,7 @@ public class ShardFollowNodeTaskTests extends ESTestCase {
         for (int i = 0; i < max; i++) {
             writeFailures.add(new ShardNotFoundException(new ShardId("leader_index", "", 0)));
         }
-        ShardChangesAction.Response response = generateShardChangesResponse(0, 63, 0L, 63L);
+        ShardChangesAction.Response response = generateShardChangesResponse(0, 63, 0L, 0L, 63L);
         // Also invokes coordinatesWrites()
         task.innerHandleReadResponse(0L, 63L, response);
 
@@ -741,7 +822,7 @@ public class ShardFollowNodeTaskTests extends ESTestCase {
         assertThat(shardChangesRequests.get(0)[1], equalTo(64L));
 
         writeFailures.add(new RuntimeException());
-        ShardChangesAction.Response response = generateShardChangesResponse(0, 63, 0L, 63L);
+        ShardChangesAction.Response response = generateShardChangesResponse(0, 63, 0L, 0L, 63L);
         // Also invokes coordinatesWrites()
         task.innerHandleReadResponse(0L, 63L, response);
 
@@ -768,7 +849,7 @@ public class ShardFollowNodeTaskTests extends ESTestCase {
         assertThat(shardChangesRequests.get(0)[0], equalTo(0L));
         assertThat(shardChangesRequests.get(0)[1], equalTo(64L));
 
-        ShardChangesAction.Response response = generateShardChangesResponse(0, 63, 0L, 64L);
+        ShardChangesAction.Response response = generateShardChangesResponse(0, 63, 0L, 0L, 64L);
         // Also invokes coordinatesWrites()
         task.innerHandleReadResponse(0L, 64L, response);
 
@@ -791,7 +872,7 @@ public class ShardFollowNodeTaskTests extends ESTestCase {
 
         shardChangesRequests.clear();
         followerGlobalCheckpoints.add(63L);
-        ShardChangesAction.Response response = generateShardChangesResponse(0, 63, 0L, 63L);
+        ShardChangesAction.Response response = generateShardChangesResponse(0, 63, 0L, 0L, 63L);
         // Also invokes coordinatesWrites()
         task.innerHandleReadResponse(0L, 63L, response);
 
@@ -866,6 +947,8 @@ public class ShardFollowNodeTaskTests extends ESTestCase {
         writeFailures = new LinkedList<>();
         mappingUpdateFailures = new LinkedList<>();
         mappingVersions = new LinkedList<>();
+        settingsUpdateFailures = new LinkedList<>();
+        settingsVersions = new LinkedList<>();
         leaderGlobalCheckpoints = new LinkedList<>();
         followerGlobalCheckpoints = new LinkedList<>();
         maxSeqNos = new LinkedList<>();
@@ -887,6 +970,20 @@ public class ShardFollowNodeTaskTests extends ESTestCase {
                 }
             }
 
+            @Override
+            protected void innerUpdateSettings(LongConsumer handler, Consumer<Exception> errorHandler) {
+                Exception failure = settingsUpdateFailures.poll();
+                if (failure != null) {
+                    errorHandler.accept(failure);
+                    return;
+                }
+
+                final Long settingsVersion = settingsVersions.poll();
+                if (settingsVersion != null) {
+                    handler.accept(settingsVersion);
+                }
+            }
+
             @Override
             protected void innerSendBulkShardOperationsRequest(
                 String followerHistoryUUID, final List<Translog.Operation> operations,
@@ -924,6 +1021,7 @@ public class ShardFollowNodeTaskTests extends ESTestCase {
                     }
                     final ShardChangesAction.Response response = new ShardChangesAction.Response(
                         mappingVersions.poll(),
+                        0L,
                         leaderGlobalCheckpoints.poll(),
                         maxSeqNos.poll(),
                         randomNonNegativeLong(),
@@ -946,7 +1044,10 @@ public class ShardFollowNodeTaskTests extends ESTestCase {
         };
     }
 
-    private static ShardChangesAction.Response generateShardChangesResponse(long fromSeqNo, long toSeqNo, long mappingVersion,
+    private static ShardChangesAction.Response generateShardChangesResponse(long fromSeqNo,
+                                                                            long toSeqNo,
+                                                                            long mappingVersion,
+                                                                            long settingsVersion,
                                                                             long leaderGlobalCheckPoint) {
         List<Translog.Operation> ops = new ArrayList<>();
         for (long seqNo = fromSeqNo; seqNo <= toSeqNo; seqNo++) {
@@ -956,6 +1057,7 @@ public class ShardFollowNodeTaskTests extends ESTestCase {
         }
         return new ShardChangesAction.Response(
             mappingVersion,
+            settingsVersion,
             leaderGlobalCheckPoint,
             leaderGlobalCheckPoint,
             randomNonNegativeLong(),

+ 8 - 1
x-pack/plugin/ccr/src/test/java/org/elasticsearch/xpack/ccr/action/ShardFollowTaskReplicationTests.java

@@ -402,6 +402,12 @@ public class ShardFollowTaskReplicationTests extends ESIndexLevelReplicationTest
                 handler.accept(1L);
             }
 
+            @Override
+            protected void innerUpdateSettings(LongConsumer handler, Consumer<Exception> errorHandler) {
+                // no-op as settings updates are not tested here
+                handler.accept(1L);
+            }
+
             @Override
             protected void innerSendBulkShardOperationsRequest(
                 final String followerHistoryUUID,
@@ -432,7 +438,7 @@ public class ShardFollowTaskReplicationTests extends ESIndexLevelReplicationTest
                             final SeqNoStats seqNoStats = indexShard.seqNoStats();
                             final long maxSeqNoOfUpdatesOrDeletes = indexShard.getMaxSeqNoOfUpdatesOrDeletes();
                             if (from > seqNoStats.getGlobalCheckpoint()) {
-                                handler.accept(ShardChangesAction.getResponse(1L, seqNoStats,
+                                handler.accept(ShardChangesAction.getResponse(1L, 1L, seqNoStats,
                                     maxSeqNoOfUpdatesOrDeletes, ShardChangesAction.EMPTY_OPERATIONS_ARRAY, 1L));
                                 return;
                             }
@@ -440,6 +446,7 @@ public class ShardFollowTaskReplicationTests extends ESIndexLevelReplicationTest
                                 maxOperationCount, recordedLeaderIndexHistoryUUID, params.getMaxReadRequestSize());
                             // hard code mapping version; this is ok, as mapping updates are not tested here
                             final ShardChangesAction.Response response = new ShardChangesAction.Response(
+                                1L,
                                 1L,
                                 seqNoStats.getGlobalCheckpoint(),
                                 seqNoStats.getMaxSeqNo(),

+ 1 - 0
x-pack/plugin/ccr/src/test/java/org/elasticsearch/xpack/ccr/action/StatsResponsesTests.java

@@ -55,6 +55,7 @@ public class StatsResponsesTests extends AbstractStreamableTestCase<FollowStatsA
                 randomNonNegativeLong(),
                 randomNonNegativeLong(),
                 randomNonNegativeLong(),
+                randomNonNegativeLong(),
                 Collections.emptyNavigableMap(),
                 randomLong(),
                 randomBoolean() ? new ElasticsearchException("fatal error") : null);

+ 5 - 1
x-pack/plugin/ccr/src/test/java/org/elasticsearch/xpack/monitoring/collector/ccr/FollowStatsMonitoringDocTests.java

@@ -93,7 +93,8 @@ public class FollowStatsMonitoringDocTests extends BaseMonitoringDocTestCase<Fol
         final int numberOfConcurrentWrites = randomIntBetween(1, Integer.MAX_VALUE);
         final int writeBufferOperationCount = randomIntBetween(0, Integer.MAX_VALUE);
         final long writeBufferSizeInBytes = randomNonNegativeLong();
-        final long followerMappingVersion = randomIntBetween(0, Integer.MAX_VALUE);
+        final long followerMappingVersion = randomNonNegativeLong();
+        final long followerSettingsVersion = randomNonNegativeLong();
         final long totalReadTimeMillis = randomLongBetween(0, 4096);
         final long totalReadRemoteExecTimeMillis = randomLongBetween(0, 4096);
         final long successfulReadRequests = randomNonNegativeLong();
@@ -124,6 +125,7 @@ public class FollowStatsMonitoringDocTests extends BaseMonitoringDocTestCase<Fol
                 writeBufferOperationCount,
                 writeBufferSizeInBytes,
                 followerMappingVersion,
+                followerSettingsVersion,
                 totalReadTimeMillis,
                 totalReadRemoteExecTimeMillis,
                 successfulReadRequests,
@@ -170,6 +172,7 @@ public class FollowStatsMonitoringDocTests extends BaseMonitoringDocTestCase<Fol
                                         + "\"write_buffer_operation_count\":" + writeBufferOperationCount + ","
                                         + "\"write_buffer_size_in_bytes\":" + writeBufferSizeInBytes + ","
                                         + "\"follower_mapping_version\":" + followerMappingVersion + ","
+                                        + "\"follower_settings_version\":" + followerSettingsVersion + ","
                                         + "\"total_read_time_millis\":" + totalReadTimeMillis + ","
                                         + "\"total_read_remote_exec_time_millis\":" + totalReadRemoteExecTimeMillis + ","
                                         + "\"successful_read_requests\":" + successfulReadRequests + ","
@@ -214,6 +217,7 @@ public class FollowStatsMonitoringDocTests extends BaseMonitoringDocTestCase<Fol
             1,
             1,
             1,
+            1,
             100,
             50,
             10,

+ 19 - 3
x-pack/plugin/core/src/main/java/org/elasticsearch/xpack/core/ccr/ShardFollowNodeTaskStatus.java

@@ -48,6 +48,7 @@ public class ShardFollowNodeTaskStatus implements Task.Status {
     private static final ParseField WRITE_BUFFER_OPERATION_COUNT_FIELD = new ParseField("write_buffer_operation_count");
     private static final ParseField WRITE_BUFFER_SIZE_IN_BYTES_FIELD = new ParseField("write_buffer_size_in_bytes");
     private static final ParseField FOLLOWER_MAPPING_VERSION_FIELD = new ParseField("follower_mapping_version");
+    private static final ParseField FOLLOWER_SETTINGS_VERSION_FIELD = new ParseField("follower_settings_version");
     private static final ParseField TOTAL_READ_TIME_MILLIS_FIELD = new ParseField("total_read_time_millis");
     private static final ParseField TOTAL_READ_REMOTE_EXEC_TIME_MILLIS_FIELD = new ParseField("total_read_remote_exec_time_millis");
     private static final ParseField SUCCESSFUL_READ_REQUESTS_FIELD = new ParseField("successful_read_requests");
@@ -91,12 +92,13 @@ public class ShardFollowNodeTaskStatus implements Task.Status {
                             (long) args[21],
                             (long) args[22],
                             (long) args[23],
+                            (long) args[24],
                             new TreeMap<>(
-                                    ((List<Map.Entry<Long, Tuple<Integer, ElasticsearchException>>>) args[24])
+                                    ((List<Map.Entry<Long, Tuple<Integer, ElasticsearchException>>>) args[25])
                                             .stream()
                                             .collect(Collectors.toMap(Map.Entry::getKey, Map.Entry::getValue))),
-                            (long) args[25],
-                            (ElasticsearchException) args[26]));
+                            (long) args[26],
+                            (ElasticsearchException) args[27]));
 
     public static final String READ_EXCEPTIONS_ENTRY_PARSER_NAME = "shard-follow-node-task-status-read-exceptions-entry";
 
@@ -120,6 +122,7 @@ public class ShardFollowNodeTaskStatus implements Task.Status {
         STATUS_PARSER.declareInt(ConstructingObjectParser.constructorArg(), WRITE_BUFFER_OPERATION_COUNT_FIELD);
         STATUS_PARSER.declareLong(ConstructingObjectParser.constructorArg(), WRITE_BUFFER_SIZE_IN_BYTES_FIELD);
         STATUS_PARSER.declareLong(ConstructingObjectParser.constructorArg(), FOLLOWER_MAPPING_VERSION_FIELD);
+        STATUS_PARSER.declareLong(ConstructingObjectParser.constructorArg(), FOLLOWER_SETTINGS_VERSION_FIELD);
         STATUS_PARSER.declareLong(ConstructingObjectParser.constructorArg(), TOTAL_READ_TIME_MILLIS_FIELD);
         STATUS_PARSER.declareLong(ConstructingObjectParser.constructorArg(), TOTAL_READ_REMOTE_EXEC_TIME_MILLIS_FIELD);
         STATUS_PARSER.declareLong(ConstructingObjectParser.constructorArg(), SUCCESSFUL_READ_REQUESTS_FIELD);
@@ -234,6 +237,12 @@ public class ShardFollowNodeTaskStatus implements Task.Status {
         return followerMappingVersion;
     }
 
+    private final long followerSettingsVersion;
+
+    public long followerSettingsVersion() {
+        return followerSettingsVersion;
+    }
+
     private final long totalReadTimeMillis;
 
     public long totalReadTimeMillis() {
@@ -327,6 +336,7 @@ public class ShardFollowNodeTaskStatus implements Task.Status {
             final int writeBufferOperationCount,
             final long writeBufferSizeInBytes,
             final long followerMappingVersion,
+            final long followerSettingsVersion,
             final long totalReadTimeMillis,
             final long totalReadRemoteExecTimeMillis,
             final long successfulReadRequests,
@@ -354,6 +364,7 @@ public class ShardFollowNodeTaskStatus implements Task.Status {
         this.writeBufferOperationCount = writeBufferOperationCount;
         this.writeBufferSizeInBytes = writeBufferSizeInBytes;
         this.followerMappingVersion = followerMappingVersion;
+        this.followerSettingsVersion = followerSettingsVersion;
         this.totalReadTimeMillis = totalReadTimeMillis;
         this.totalReadRemoteExecTimeMillis = totalReadRemoteExecTimeMillis;
         this.successfulReadRequests = successfulReadRequests;
@@ -384,6 +395,7 @@ public class ShardFollowNodeTaskStatus implements Task.Status {
         this.writeBufferOperationCount = in.readVInt();
         this.writeBufferSizeInBytes = in.readVLong();
         this.followerMappingVersion = in.readVLong();
+        this.followerSettingsVersion = in.readVLong();
         this.totalReadTimeMillis = in.readVLong();
         this.totalReadRemoteExecTimeMillis = in.readVLong();
         this.successfulReadRequests = in.readVLong();
@@ -421,6 +433,7 @@ public class ShardFollowNodeTaskStatus implements Task.Status {
         out.writeVInt(writeBufferOperationCount);
         out.writeVLong(writeBufferSizeInBytes);
         out.writeVLong(followerMappingVersion);
+        out.writeVLong(followerSettingsVersion);
         out.writeVLong(totalReadTimeMillis);
         out.writeVLong(totalReadRemoteExecTimeMillis);
         out.writeVLong(successfulReadRequests);
@@ -470,6 +483,7 @@ public class ShardFollowNodeTaskStatus implements Task.Status {
                 "write_buffer_size",
                 new ByteSizeValue(writeBufferSizeInBytes));
         builder.field(FOLLOWER_MAPPING_VERSION_FIELD.getPreferredName(), followerMappingVersion);
+        builder.field(FOLLOWER_SETTINGS_VERSION_FIELD.getPreferredName(), followerSettingsVersion);
         builder.humanReadableField(
                 TOTAL_READ_TIME_MILLIS_FIELD.getPreferredName(),
                 "total_read_time",
@@ -550,6 +564,7 @@ public class ShardFollowNodeTaskStatus implements Task.Status {
                 writeBufferOperationCount == that.writeBufferOperationCount &&
                 writeBufferSizeInBytes == that.writeBufferSizeInBytes &&
                 followerMappingVersion == that.followerMappingVersion &&
+                followerSettingsVersion== that.followerSettingsVersion &&
                 totalReadTimeMillis == that.totalReadTimeMillis &&
                 totalReadRemoteExecTimeMillis == that.totalReadRemoteExecTimeMillis &&
                 successfulReadRequests == that.successfulReadRequests &&
@@ -588,6 +603,7 @@ public class ShardFollowNodeTaskStatus implements Task.Status {
                 writeBufferOperationCount,
                 writeBufferSizeInBytes,
                 followerMappingVersion,
+                followerSettingsVersion,
                 totalReadTimeMillis,
                 totalReadRemoteExecTimeMillis,
                 successfulReadRequests,

+ 3 - 0
x-pack/plugin/core/src/main/resources/monitoring-es.json

@@ -971,6 +971,9 @@
             "follower_mapping_version": {
               "type": "long"
             },
+            "follower_settings_version": {
+              "type": "long"
+            },
             "total_read_time_millis": {
               "type": "long"
             },

+ 5 - 3
x-pack/plugin/core/src/test/java/org/elasticsearch/xpack/core/LocalStateCompositeXPackPlugin.java

@@ -24,6 +24,7 @@ import org.elasticsearch.common.settings.IndexScopedSettings;
 import org.elasticsearch.common.settings.Setting;
 import org.elasticsearch.common.settings.Settings;
 import org.elasticsearch.common.settings.SettingsFilter;
+import org.elasticsearch.common.settings.SettingsModule;
 import org.elasticsearch.common.util.BigArrays;
 import org.elasticsearch.common.util.PageCacheRecycler;
 import org.elasticsearch.common.util.concurrent.ThreadContext;
@@ -74,7 +75,6 @@ import java.util.List;
 import java.util.Map;
 import java.util.Objects;
 import java.util.Optional;
-import java.util.Optional;
 import java.util.function.BiConsumer;
 import java.util.function.Function;
 import java.util.function.Predicate;
@@ -384,9 +384,11 @@ public class LocalStateCompositeXPackPlugin extends XPackPlugin implements Scrip
 
     @Override
     public List<PersistentTasksExecutor<?>> getPersistentTasksExecutor(ClusterService clusterService,
-                                                                       ThreadPool threadPool, Client client) {
+                                                                       ThreadPool threadPool,
+                                                                       Client client,
+                                                                       SettingsModule settingsModule) {
         return filterPlugins(PersistentTaskPlugin.class).stream()
-                .map(p -> p.getPersistentTasksExecutor(clusterService, threadPool, client))
+                .map(p -> p.getPersistentTasksExecutor(clusterService, threadPool, client, settingsModule))
                 .flatMap(List::stream)
                 .collect(toList());
     }

+ 4 - 1
x-pack/plugin/ml/src/main/java/org/elasticsearch/xpack/ml/MachineLearning.java

@@ -29,6 +29,7 @@ import org.elasticsearch.common.settings.Setting;
 import org.elasticsearch.common.settings.Setting.Property;
 import org.elasticsearch.common.settings.Settings;
 import org.elasticsearch.common.settings.SettingsFilter;
+import org.elasticsearch.common.settings.SettingsModule;
 import org.elasticsearch.common.unit.TimeValue;
 import org.elasticsearch.common.xcontent.NamedXContentRegistry;
 import org.elasticsearch.common.xcontent.XContentBuilder;
@@ -431,7 +432,9 @@ public class MachineLearning extends Plugin implements ActionPlugin, AnalysisPlu
     }
 
     public List<PersistentTasksExecutor<?>> getPersistentTasksExecutor(ClusterService clusterService,
-                                                                       ThreadPool threadPool, Client client) {
+                                                                       ThreadPool threadPool,
+                                                                       Client client,
+                                                                       SettingsModule settingsModule) {
         if (enabled == false || transportClientMode) {
             return emptyList();
         }

+ 4 - 1
x-pack/plugin/rollup/src/main/java/org/elasticsearch/xpack/rollup/Rollup.java

@@ -18,6 +18,7 @@ import org.elasticsearch.common.settings.ClusterSettings;
 import org.elasticsearch.common.settings.IndexScopedSettings;
 import org.elasticsearch.common.settings.Settings;
 import org.elasticsearch.common.settings.SettingsFilter;
+import org.elasticsearch.common.settings.SettingsModule;
 import org.elasticsearch.common.xcontent.NamedXContentRegistry;
 import org.elasticsearch.env.Environment;
 import org.elasticsearch.env.NodeEnvironment;
@@ -189,7 +190,9 @@ public class Rollup extends Plugin implements ActionPlugin, PersistentTaskPlugin
 
     @Override
     public List<PersistentTasksExecutor<?>> getPersistentTasksExecutor(ClusterService clusterService,
-                                                                       ThreadPool threadPool, Client client) {
+                                                                       ThreadPool threadPool,
+                                                                       Client client,
+                                                                       SettingsModule settingsModule) {
         if (enabled == false || transportClientMode ) {
             return emptyList();
         }