Forráskód Böngészése

[CCR] Validate follower index historyUUIDs (#34078)

The follower index shard history UUID will be fetched from the indices stats api when the shard follow task starts and will be provided with the bulk shard operation requests. The bulk shard operations api will fail if the provided history uuid is unequal to the actual history uuid.

No longer record the leader history uuid in shard follow task params, but rather use the leader history UUIDs directly from follower index's custom metadata. The resume follow api will remain to fail if leader index shard history UUIDs are missing.

Closes #33956
Martijn van Groningen 7 éve
szülő
commit
7f5c2f1050
14 módosított fájl, 159 hozzáadás és 84 törlés
  1. 1 1
      x-pack/plugin/ccr/src/main/java/org/elasticsearch/xpack/ccr/Ccr.java
  2. 13 7
      x-pack/plugin/ccr/src/main/java/org/elasticsearch/xpack/ccr/action/ShardFollowNodeTask.java
  3. 3 16
      x-pack/plugin/ccr/src/main/java/org/elasticsearch/xpack/ccr/action/ShardFollowTask.java
  4. 41 18
      x-pack/plugin/ccr/src/main/java/org/elasticsearch/xpack/ccr/action/ShardFollowTasksExecutor.java
  5. 1 1
      x-pack/plugin/ccr/src/main/java/org/elasticsearch/xpack/ccr/action/TransportPutFollowAction.java
  6. 8 9
      x-pack/plugin/ccr/src/main/java/org/elasticsearch/xpack/ccr/action/TransportResumeFollowAction.java
  7. 14 2
      x-pack/plugin/ccr/src/main/java/org/elasticsearch/xpack/ccr/action/bulk/BulkShardOperationsRequest.java
  8. 9 3
      x-pack/plugin/ccr/src/main/java/org/elasticsearch/xpack/ccr/action/bulk/TransportBulkShardOperationsAction.java
  9. 5 6
      x-pack/plugin/ccr/src/test/java/org/elasticsearch/xpack/ccr/action/ShardFollowNodeTaskRandomTests.java
  10. 6 7
      x-pack/plugin/ccr/src/test/java/org/elasticsearch/xpack/ccr/action/ShardFollowNodeTaskTests.java
  11. 56 11
      x-pack/plugin/ccr/src/test/java/org/elasticsearch/xpack/ccr/action/ShardFollowTaskReplicationTests.java
  12. 0 1
      x-pack/plugin/ccr/src/test/java/org/elasticsearch/xpack/ccr/action/ShardFollowTaskTests.java
  13. 0 1
      x-pack/plugin/ccr/src/test/java/org/elasticsearch/xpack/ccr/action/TransportUnfollowActionTests.java
  14. 2 1
      x-pack/plugin/ccr/src/test/java/org/elasticsearch/xpack/ccr/action/bulk/BulkShardOperationsTests.java

+ 1 - 1
x-pack/plugin/ccr/src/main/java/org/elasticsearch/xpack/ccr/Ccr.java

@@ -148,7 +148,7 @@ public class Ccr extends Plugin implements ActionPlugin, PersistentTaskPlugin, E
     @Override
     public List<PersistentTasksExecutor<?>> getPersistentTasksExecutor(ClusterService clusterService,
                                                                        ThreadPool threadPool, Client client) {
-        return Collections.singletonList(new ShardFollowTasksExecutor(settings, client, threadPool));
+        return Collections.singletonList(new ShardFollowTasksExecutor(settings, client, threadPool, clusterService));
     }
 
     public List<ActionHandler<? extends ActionRequest, ? extends ActionResponse>> getActions() {

+ 13 - 7
x-pack/plugin/ccr/src/main/java/org/elasticsearch/xpack/ccr/action/ShardFollowNodeTask.java

@@ -62,6 +62,7 @@ public abstract class ShardFollowNodeTask extends AllocatedPersistentTask {
     private final BiConsumer<TimeValue, Runnable> scheduler;
     private final LongSupplier relativeTimeProvider;
 
+    private String followerHistoryUUID;
     private long leaderGlobalCheckpoint;
     private long leaderMaxSeqNo;
     private long leaderMaxSeqNoOfUpdatesOrDeletes = SequenceNumbers.UNASSIGNED_SEQ_NO;
@@ -110,15 +111,17 @@ public abstract class ShardFollowNodeTask extends AllocatedPersistentTask {
     }
 
     void start(
-            final long leaderGlobalCheckpoint,
-            final long leaderMaxSeqNo,
-            final long followerGlobalCheckpoint,
-            final long followerMaxSeqNo) {
+        final String followerHistoryUUID,
+        final long leaderGlobalCheckpoint,
+        final long leaderMaxSeqNo,
+        final long followerGlobalCheckpoint,
+        final long followerMaxSeqNo) {
         /*
          * While this should only ever be called once and before any other threads can touch these fields, we use synchronization here to
          * avoid the need to declare these fields as volatile. That is, we are ensuring thesefields are always accessed under the same lock.
          */
         synchronized (this) {
+            this.followerHistoryUUID = followerHistoryUUID;
             this.leaderGlobalCheckpoint = leaderGlobalCheckpoint;
             this.leaderMaxSeqNo = leaderMaxSeqNo;
             this.followerGlobalCheckpoint = followerGlobalCheckpoint;
@@ -305,7 +308,7 @@ public abstract class ShardFollowNodeTask extends AllocatedPersistentTask {
                                                 AtomicInteger retryCounter) {
         assert leaderMaxSeqNoOfUpdatesOrDeletes != SequenceNumbers.UNASSIGNED_SEQ_NO : "mus is not replicated";
         final long startTime = relativeTimeProvider.getAsLong();
-        innerSendBulkShardOperationsRequest(operations, leaderMaxSeqNoOfUpdatesOrDeletes,
+        innerSendBulkShardOperationsRequest(followerHistoryUUID, operations, leaderMaxSeqNoOfUpdatesOrDeletes,
                 response -> {
                     synchronized (ShardFollowNodeTask.this) {
                         totalIndexTimeMillis += TimeUnit.NANOSECONDS.toMillis(relativeTimeProvider.getAsLong() - startTime);
@@ -404,8 +407,11 @@ public abstract class ShardFollowNodeTask extends AllocatedPersistentTask {
     // These methods are protected for testing purposes:
     protected abstract void innerUpdateMapping(LongConsumer handler, Consumer<Exception> errorHandler);
 
-    protected abstract void innerSendBulkShardOperationsRequest(List<Translog.Operation> operations, long leaderMaxSeqNoOfUpdatesOrDeletes,
-                                    Consumer<BulkShardOperationsResponse> handler, Consumer<Exception> errorHandler);
+    protected abstract void innerSendBulkShardOperationsRequest(String followerHistoryUUID,
+                                                                List<Translog.Operation> operations,
+                                                                long leaderMaxSeqNoOfUpdatesOrDeletes,
+                                                                Consumer<BulkShardOperationsResponse> handler,
+                                                                Consumer<Exception> errorHandler);
 
     protected abstract void innerSendShardChangesRequest(long from, int maxOperationCount, Consumer<ShardChangesAction.Response> handler,
                                                          Consumer<Exception> errorHandler);

+ 3 - 16
x-pack/plugin/ccr/src/main/java/org/elasticsearch/xpack/ccr/action/ShardFollowTask.java

@@ -51,13 +51,12 @@ public class ShardFollowTask implements XPackPlugin.XPackPersistentTaskParams {
     public static final ParseField MAX_WRITE_BUFFER_SIZE = new ParseField("max_write_buffer_size");
     public static final ParseField MAX_RETRY_DELAY = new ParseField("max_retry_delay");
     public static final ParseField POLL_TIMEOUT = new ParseField("poll_timeout");
-    public static final ParseField RECORDED_HISTORY_UUID = new ParseField("recorded_history_uuid");
 
     @SuppressWarnings("unchecked")
     private static ConstructingObjectParser<ShardFollowTask, Void> PARSER = new ConstructingObjectParser<>(NAME,
             (a) -> new ShardFollowTask((String) a[0], new ShardId((String) a[1], (String) a[2], (int) a[3]),
                     new ShardId((String) a[4], (String) a[5], (int) a[6]), (int) a[7], (int) a[8], (ByteSizeValue) a[9],
-                (int) a[10], (int) a[11], (TimeValue) a[12], (TimeValue) a[13], (String) a[14], (Map<String, String>) a[15]));
+                (int) a[10], (int) a[11], (TimeValue) a[12], (TimeValue) a[13], (Map<String, String>) a[14]));
 
     static {
         PARSER.declareString(ConstructingObjectParser.optionalConstructorArg(), LEADER_CLUSTER_ALIAS_FIELD);
@@ -82,7 +81,6 @@ public class ShardFollowTask implements XPackPlugin.XPackPersistentTaskParams {
         PARSER.declareField(ConstructingObjectParser.constructorArg(),
             (p, c) -> TimeValue.parseTimeValue(p.text(), POLL_TIMEOUT.getPreferredName()),
                 POLL_TIMEOUT, ObjectParser.ValueType.STRING);
-        PARSER.declareString(ConstructingObjectParser.constructorArg(), RECORDED_HISTORY_UUID);
         PARSER.declareObject(ConstructingObjectParser.constructorArg(), (p, c) -> p.mapStrings(), HEADERS);
     }
 
@@ -96,7 +94,6 @@ public class ShardFollowTask implements XPackPlugin.XPackPersistentTaskParams {
     private final int maxWriteBufferSize;
     private final TimeValue maxRetryDelay;
     private final TimeValue pollTimeout;
-    private final String recordedLeaderIndexHistoryUUID;
     private final Map<String, String> headers;
 
     ShardFollowTask(
@@ -110,7 +107,6 @@ public class ShardFollowTask implements XPackPlugin.XPackPersistentTaskParams {
             final int maxWriteBufferSize,
             final TimeValue maxRetryDelay,
             final TimeValue pollTimeout,
-            final String recordedLeaderIndexHistoryUUID,
             final Map<String, String> headers) {
         this.leaderClusterAlias = leaderClusterAlias;
         this.followShardId = followShardId;
@@ -122,7 +118,6 @@ public class ShardFollowTask implements XPackPlugin.XPackPersistentTaskParams {
         this.maxWriteBufferSize = maxWriteBufferSize;
         this.maxRetryDelay = maxRetryDelay;
         this.pollTimeout = pollTimeout;
-        this.recordedLeaderIndexHistoryUUID = recordedLeaderIndexHistoryUUID;
         this.headers = headers != null ? Collections.unmodifiableMap(headers) : Collections.emptyMap();
     }
 
@@ -137,7 +132,6 @@ public class ShardFollowTask implements XPackPlugin.XPackPersistentTaskParams {
         this.maxWriteBufferSize = in.readVInt();
         this.maxRetryDelay = in.readTimeValue();
         this.pollTimeout = in.readTimeValue();
-        this.recordedLeaderIndexHistoryUUID = in.readString();
         this.headers = Collections.unmodifiableMap(in.readMap(StreamInput::readString, StreamInput::readString));
     }
 
@@ -185,10 +179,6 @@ public class ShardFollowTask implements XPackPlugin.XPackPersistentTaskParams {
         return followShardId.getIndex().getUUID() + "-" + followShardId.getId();
     }
 
-    public String getRecordedLeaderIndexHistoryUUID() {
-        return recordedLeaderIndexHistoryUUID;
-    }
-
     public Map<String, String> getHeaders() {
         return headers;
     }
@@ -210,7 +200,6 @@ public class ShardFollowTask implements XPackPlugin.XPackPersistentTaskParams {
         out.writeVInt(maxWriteBufferSize);
         out.writeTimeValue(maxRetryDelay);
         out.writeTimeValue(pollTimeout);
-        out.writeString(recordedLeaderIndexHistoryUUID);
         out.writeMap(headers, StreamOutput::writeString, StreamOutput::writeString);
     }
 
@@ -237,7 +226,6 @@ public class ShardFollowTask implements XPackPlugin.XPackPersistentTaskParams {
         builder.field(MAX_WRITE_BUFFER_SIZE.getPreferredName(), maxWriteBufferSize);
         builder.field(MAX_RETRY_DELAY.getPreferredName(), maxRetryDelay.getStringRep());
         builder.field(POLL_TIMEOUT.getPreferredName(), pollTimeout.getStringRep());
-        builder.field(RECORDED_HISTORY_UUID.getPreferredName(), recordedLeaderIndexHistoryUUID);
         builder.field(HEADERS.getPreferredName(), headers);
         return builder.endObject();
     }
@@ -257,7 +245,6 @@ public class ShardFollowTask implements XPackPlugin.XPackPersistentTaskParams {
                 maxWriteBufferSize == that.maxWriteBufferSize &&
                 Objects.equals(maxRetryDelay, that.maxRetryDelay) &&
                 Objects.equals(pollTimeout, that.pollTimeout) &&
-                Objects.equals(recordedLeaderIndexHistoryUUID, that.recordedLeaderIndexHistoryUUID) &&
                 Objects.equals(headers, that.headers);
     }
 
@@ -274,8 +261,8 @@ public class ShardFollowTask implements XPackPlugin.XPackPersistentTaskParams {
                 maxWriteBufferSize,
                 maxRetryDelay,
                 pollTimeout,
-                recordedLeaderIndexHistoryUUID,
-                headers);
+                headers
+        );
     }
 
     public String toString() {

+ 41 - 18
x-pack/plugin/ccr/src/main/java/org/elasticsearch/xpack/ccr/action/ShardFollowTasksExecutor.java

@@ -17,12 +17,15 @@ import org.elasticsearch.cluster.ClusterState;
 import org.elasticsearch.cluster.metadata.IndexMetaData;
 import org.elasticsearch.cluster.metadata.MappingMetaData;
 import org.elasticsearch.cluster.routing.IndexRoutingTable;
+import org.elasticsearch.cluster.service.ClusterService;
 import org.elasticsearch.common.settings.Settings;
 import org.elasticsearch.common.unit.TimeValue;
 import org.elasticsearch.common.util.concurrent.EsRejectedExecutionException;
 import org.elasticsearch.common.xcontent.XContentType;
 import org.elasticsearch.index.Index;
 import org.elasticsearch.index.IndexNotFoundException;
+import org.elasticsearch.index.engine.CommitStats;
+import org.elasticsearch.index.engine.Engine;
 import org.elasticsearch.index.seqno.SeqNoStats;
 import org.elasticsearch.index.shard.ShardId;
 import org.elasticsearch.index.shard.ShardNotFoundException;
@@ -47,16 +50,19 @@ import java.util.function.Consumer;
 import java.util.function.LongConsumer;
 
 import static org.elasticsearch.xpack.ccr.CcrLicenseChecker.wrapClient;
+import static org.elasticsearch.xpack.ccr.action.TransportResumeFollowAction.extractLeaderShardHistoryUUIDs;
 
 public class ShardFollowTasksExecutor extends PersistentTasksExecutor<ShardFollowTask> {
 
     private final Client client;
     private final ThreadPool threadPool;
+    private final ClusterService clusterService;
 
-    public ShardFollowTasksExecutor(Settings settings, Client client, ThreadPool threadPool) {
+    public ShardFollowTasksExecutor(Settings settings, Client client, ThreadPool threadPool, ClusterService clusterService) {
         super(settings, ShardFollowTask.NAME, Ccr.CCR_THREAD_POOL_NAME);
         this.client = client;
         this.threadPool = threadPool;
+        this.clusterService = clusterService;
     }
 
     @Override
@@ -99,8 +105,10 @@ public class ShardFollowTasksExecutor extends PersistentTasksExecutor<ShardFollo
                 }
             }
         };
-        return new ShardFollowNodeTask(
-                id, type, action, getDescription(taskInProgress), parentTaskId, headers, params, scheduler, System::nanoTime) {
+
+        final String recordedLeaderShardHistoryUUID = getLeaderShardHistoryUUID(params);
+        return new ShardFollowNodeTask(id, type, action, getDescription(taskInProgress), parentTaskId, headers, params,
+            scheduler, System::nanoTime) {
 
             @Override
             protected void innerUpdateMapping(LongConsumer handler, Consumer<Exception> errorHandler) {
@@ -135,12 +143,14 @@ public class ShardFollowTasksExecutor extends PersistentTasksExecutor<ShardFollo
 
             @Override
             protected void innerSendBulkShardOperationsRequest(
-                    final List<Translog.Operation> operations,
-                    final long maxSeqNoOfUpdatesOrDeletes,
-                    final Consumer<BulkShardOperationsResponse> handler,
-                    final Consumer<Exception> errorHandler) {
-                final BulkShardOperationsRequest request = new BulkShardOperationsRequest(
-                    params.getFollowShardId(), operations, maxSeqNoOfUpdatesOrDeletes);
+                final String followerHistoryUUID,
+                final List<Translog.Operation> operations,
+                final long maxSeqNoOfUpdatesOrDeletes,
+                final Consumer<BulkShardOperationsResponse> handler,
+                final Consumer<Exception> errorHandler) {
+
+                final BulkShardOperationsRequest request = new BulkShardOperationsRequest(params.getFollowShardId(),
+                    followerHistoryUUID, operations, maxSeqNoOfUpdatesOrDeletes);
                 followerClient.execute(BulkShardOperationsAction.INSTANCE, request,
                     ActionListener.wrap(response -> handler.accept(response), errorHandler));
             }
@@ -149,7 +159,7 @@ public class ShardFollowTasksExecutor extends PersistentTasksExecutor<ShardFollo
             protected void innerSendShardChangesRequest(long from, int maxOperationCount, Consumer<ShardChangesAction.Response> handler,
                                                         Consumer<Exception> errorHandler) {
                 ShardChangesAction.Request request =
-                    new ShardChangesAction.Request(params.getLeaderShardId(), params.getRecordedLeaderIndexHistoryUUID());
+                    new ShardChangesAction.Request(params.getLeaderShardId(), recordedLeaderShardHistoryUUID);
                 request.setFromSeqNo(from);
                 request.setMaxOperationCount(maxOperationCount);
                 request.setMaxBatchSize(params.getMaxBatchSize());
@@ -159,8 +169,15 @@ public class ShardFollowTasksExecutor extends PersistentTasksExecutor<ShardFollo
         };
     }
 
-    interface BiLongConsumer {
-        void accept(long x, long y);
+    private String getLeaderShardHistoryUUID(ShardFollowTask params) {
+        IndexMetaData followIndexMetaData = clusterService.state().metaData().index(params.getFollowShardId().getIndex());
+        Map<String, String> ccrIndexMetadata = followIndexMetaData.getCustomData(Ccr.CCR_CUSTOM_METADATA_KEY);
+        String[] recordedLeaderShardHistoryUUIDs = extractLeaderShardHistoryUUIDs(ccrIndexMetadata);
+        return recordedLeaderShardHistoryUUIDs[params.getLeaderShardId().id()];
+    }
+
+    interface FollowerStatsInfoHandler {
+        void accept(String followerHistoryUUID, long globalCheckpoint, long maxSeqNo);
     }
 
     @Override
@@ -169,7 +186,9 @@ public class ShardFollowTasksExecutor extends PersistentTasksExecutor<ShardFollo
         ShardFollowNodeTask shardFollowNodeTask = (ShardFollowNodeTask) task;
         logger.info("{} Starting to track leader shard {}", params.getFollowShardId(), params.getLeaderShardId());
 
-        BiLongConsumer handler = (followerGCP, maxSeqNo) -> shardFollowNodeTask.start(followerGCP, maxSeqNo, followerGCP, maxSeqNo);
+        FollowerStatsInfoHandler handler = (followerHistoryUUID, followerGCP, maxSeqNo) -> {
+            shardFollowNodeTask.start(followerHistoryUUID, followerGCP, maxSeqNo, followerGCP, maxSeqNo);
+        };
         Consumer<Exception> errorHandler = e -> {
             if (shardFollowNodeTask.isStopped()) {
                 return;
@@ -184,13 +203,13 @@ public class ShardFollowTasksExecutor extends PersistentTasksExecutor<ShardFollo
             }
         };
 
-        fetchGlobalCheckpoint(followerClient, params.getFollowShardId(), handler, errorHandler);
+        fetchFollowerShardInfo(followerClient, params.getFollowShardId(), handler, errorHandler);
     }
 
-    private void fetchGlobalCheckpoint(
+    private void fetchFollowerShardInfo(
             final Client client,
             final ShardId shardId,
-            final BiLongConsumer handler,
+            final FollowerStatsInfoHandler handler,
             final Consumer<Exception> errorHandler) {
         client.admin().indices().stats(new IndicesStatsRequest().indices(shardId.getIndexName()), ActionListener.wrap(r -> {
             IndexStats indexStats = r.getIndex(shardId.getIndexName());
@@ -204,10 +223,14 @@ public class ShardFollowTasksExecutor extends PersistentTasksExecutor<ShardFollo
                     .filter(shardStats -> shardStats.getShardRouting().primary())
                     .findAny();
             if (filteredShardStats.isPresent()) {
-                final SeqNoStats seqNoStats = filteredShardStats.get().getSeqNoStats();
+                final ShardStats shardStats = filteredShardStats.get();
+                final CommitStats commitStats = shardStats.getCommitStats();
+                final String historyUUID = commitStats.getUserData().get(Engine.HISTORY_UUID_KEY);
+
+                final SeqNoStats seqNoStats = shardStats.getSeqNoStats();
                 final long globalCheckpoint = seqNoStats.getGlobalCheckpoint();
                 final long maxSeqNo = seqNoStats.getMaxSeqNo();
-                handler.accept(globalCheckpoint, maxSeqNo);
+                handler.accept(historyUUID, globalCheckpoint, maxSeqNo);
             } else {
                 errorHandler.accept(new ShardNotFoundException(shardId));
             }

+ 1 - 1
x-pack/plugin/ccr/src/main/java/org/elasticsearch/xpack/ccr/action/TransportPutFollowAction.java

@@ -174,7 +174,7 @@ public final class TransportPutFollowAction
                 listener::onFailure);
         // Can't use create index api here, because then index templates can alter the mappings / settings.
         // And index templates could introduce settings / mappings that are incompatible with the leader index.
-        clusterService.submitStateUpdateTask("follow_index_action", new AckedClusterStateUpdateTask<Boolean>(request, handler) {
+        clusterService.submitStateUpdateTask("create_following_index", new AckedClusterStateUpdateTask<Boolean>(request, handler) {
 
             @Override
             protected Boolean newResponse(final boolean acknowledged) {

+ 8 - 9
x-pack/plugin/ccr/src/main/java/org/elasticsearch/xpack/ccr/action/TransportResumeFollowAction.java

@@ -192,12 +192,9 @@ public class TransportResumeFollowAction extends HandledTransportAction<ResumeFo
         for (int i = 0; i < numShards; i++) {
             final int shardId = i;
             String taskId = followIndexMetadata.getIndexUUID() + "-" + shardId;
-            Map<String, String> ccrIndexMetadata = followIndexMetadata.getCustomData(Ccr.CCR_CUSTOM_METADATA_KEY);
-            String[] recordedLeaderShardHistoryUUIDs = extractIndexShardHistoryUUIDs(ccrIndexMetadata);
-            String recordedLeaderShardHistoryUUID = recordedLeaderShardHistoryUUIDs[shardId];
 
-            final ShardFollowTask shardFollowTask =  createShardFollowTask(shardId, clusterNameAlias, request,
-                leaderIndexMetadata, followIndexMetadata, recordedLeaderShardHistoryUUID, filteredHeaders);
+            final ShardFollowTask shardFollowTask = createShardFollowTask(shardId, clusterNameAlias, request,
+                leaderIndexMetadata, followIndexMetadata, filteredHeaders);
             persistentTasksService.sendStartRequest(taskId, ShardFollowTask.NAME, shardFollowTask,
                     new ActionListener<PersistentTasksCustomMetaData.PersistentTask<ShardFollowTask>>() {
                         @Override
@@ -263,7 +260,7 @@ public class TransportResumeFollowAction extends HandledTransportAction<ResumeFo
                     "] as leader index but instead reference [" + recordedLeaderIndexUUID + "] as leader index");
         }
 
-        String[] recordedHistoryUUIDs = extractIndexShardHistoryUUIDs(ccrIndexMetadata);
+        String[] recordedHistoryUUIDs = extractLeaderShardHistoryUUIDs(ccrIndexMetadata);
         assert recordedHistoryUUIDs.length == leaderIndexHistoryUUID.length;
         for (int i = 0; i < leaderIndexHistoryUUID.length; i++) {
             String recordedLeaderIndexHistoryUUID = recordedHistoryUUIDs[i];
@@ -311,7 +308,6 @@ public class TransportResumeFollowAction extends HandledTransportAction<ResumeFo
         ResumeFollowAction.Request request,
         IndexMetaData leaderIndexMetadata,
         IndexMetaData followIndexMetadata,
-        String recordedLeaderShardHistoryUUID,
         Map<String, String> filteredHeaders
     ) {
         int maxBatchOperationCount;
@@ -363,13 +359,16 @@ public class TransportResumeFollowAction extends HandledTransportAction<ResumeFo
             maxWriteBufferSize,
             maxRetryDelay,
             pollTimeout,
-            recordedLeaderShardHistoryUUID,
             filteredHeaders
         );
     }
 
-    private static String[] extractIndexShardHistoryUUIDs(Map<String, String> ccrIndexMetaData) {
+    static String[] extractLeaderShardHistoryUUIDs(Map<String, String> ccrIndexMetaData) {
         String historyUUIDs = ccrIndexMetaData.get(Ccr.CCR_CUSTOM_METADATA_LEADER_INDEX_SHARD_HISTORY_UUIDS);
+        if (historyUUIDs == null) {
+            throw new IllegalArgumentException("leader index shard UUIDs are missing");
+        }
+
         return historyUUIDs.split(",");
     }
 

+ 14 - 2
x-pack/plugin/ccr/src/main/java/org/elasticsearch/xpack/ccr/action/bulk/BulkShardOperationsRequest.java

@@ -16,19 +16,28 @@ import java.util.List;
 
 public final class BulkShardOperationsRequest extends ReplicatedWriteRequest<BulkShardOperationsRequest> {
 
+    private String historyUUID;
     private List<Translog.Operation> operations;
     private long maxSeqNoOfUpdatesOrDeletes;
 
     public BulkShardOperationsRequest() {
     }
 
-    public BulkShardOperationsRequest(ShardId shardId, List<Translog.Operation> operations, long maxSeqNoOfUpdatesOrDeletes) {
+    public BulkShardOperationsRequest(final ShardId shardId,
+                                      final String historyUUID,
+                                      final List<Translog.Operation> operations,
+                                      long maxSeqNoOfUpdatesOrDeletes) {
         super(shardId);
         setRefreshPolicy(RefreshPolicy.NONE);
+        this.historyUUID = historyUUID;
         this.operations = operations;
         this.maxSeqNoOfUpdatesOrDeletes = maxSeqNoOfUpdatesOrDeletes;
     }
 
+    public String getHistoryUUID() {
+        return historyUUID;
+    }
+
     public List<Translog.Operation> getOperations() {
         return operations;
     }
@@ -40,6 +49,7 @@ public final class BulkShardOperationsRequest extends ReplicatedWriteRequest<Bul
     @Override
     public void readFrom(final StreamInput in) throws IOException {
         super.readFrom(in);
+        historyUUID = in.readString();
         maxSeqNoOfUpdatesOrDeletes = in.readZLong();
         operations = in.readList(Translog.Operation::readOperation);
     }
@@ -47,6 +57,7 @@ public final class BulkShardOperationsRequest extends ReplicatedWriteRequest<Bul
     @Override
     public void writeTo(final StreamOutput out) throws IOException {
         super.writeTo(out);
+        out.writeString(historyUUID);
         out.writeZLong(maxSeqNoOfUpdatesOrDeletes);
         out.writeVInt(operations.size());
         for (Translog.Operation operation : operations) {
@@ -57,7 +68,8 @@ public final class BulkShardOperationsRequest extends ReplicatedWriteRequest<Bul
     @Override
     public String toString() {
         return "BulkShardOperationsRequest{" +
-                "operations=" + operations.size()+
+                "historyUUID=" + historyUUID +
+                ", operations=" + operations.size() +
                 ", maxSeqNoUpdates=" + maxSeqNoOfUpdatesOrDeletes +
                 ", shardId=" + shardId +
                 ", timeout=" + timeout +

+ 9 - 3
x-pack/plugin/ccr/src/main/java/org/elasticsearch/xpack/ccr/action/bulk/TransportBulkShardOperationsAction.java

@@ -61,17 +61,23 @@ public class TransportBulkShardOperationsAction
     @Override
     protected WritePrimaryResult<BulkShardOperationsRequest, BulkShardOperationsResponse> shardOperationOnPrimary(
             final BulkShardOperationsRequest request, final IndexShard primary) throws Exception {
-        return shardOperationOnPrimary(
-            request.shardId(), request.getOperations(), request.getMaxSeqNoOfUpdatesOrDeletes(), primary, logger);
+        return shardOperationOnPrimary(request.shardId(), request.getHistoryUUID(), request.getOperations(),
+            request.getMaxSeqNoOfUpdatesOrDeletes(), primary, logger);
     }
 
     // public for testing purposes only
     public static WritePrimaryResult<BulkShardOperationsRequest, BulkShardOperationsResponse> shardOperationOnPrimary(
             final ShardId shardId,
+            final String historyUUID,
             final List<Translog.Operation> sourceOperations,
             final long maxSeqNoOfUpdatesOrDeletes,
             final IndexShard primary,
             final Logger logger) throws IOException {
+        if (historyUUID.equalsIgnoreCase(primary.getHistoryUUID()) == false) {
+            throw new IllegalStateException("unexpected history uuid, expected [" + historyUUID +
+                "], actual [" + primary.getHistoryUUID() + "], shard is likely restored from snapshot or force allocated");
+        }
+
         final List<Translog.Operation> targetOperations = sourceOperations.stream().map(operation -> {
             final Translog.Operation operationWithPrimaryTerm;
             switch (operation.opType()) {
@@ -110,7 +116,7 @@ public class TransportBulkShardOperationsAction
         primary.advanceMaxSeqNoOfUpdatesOrDeletes(maxSeqNoOfUpdatesOrDeletes);
         final Translog.Location location = applyTranslogOperations(targetOperations, primary, Engine.Operation.Origin.PRIMARY);
         final BulkShardOperationsRequest replicaRequest = new BulkShardOperationsRequest(
-            shardId, targetOperations, maxSeqNoOfUpdatesOrDeletes);
+            shardId, historyUUID, targetOperations, maxSeqNoOfUpdatesOrDeletes);
         return new CcrWritePrimaryResult(replicaRequest, location, primary, logger);
     }
 

+ 5 - 6
x-pack/plugin/ccr/src/test/java/org/elasticsearch/xpack/ccr/action/ShardFollowNodeTaskRandomTests.java

@@ -51,7 +51,7 @@ public class ShardFollowNodeTaskRandomTests extends ESTestCase {
     }
 
     private void startAndAssertAndStopTask(ShardFollowNodeTask task, TestRun testRun) throws Exception {
-        task.start(testRun.startSeqNo - 1, testRun.startSeqNo - 1, testRun.startSeqNo - 1, testRun.startSeqNo - 1);
+        task.start("uuid", testRun.startSeqNo - 1, testRun.startSeqNo - 1, testRun.startSeqNo - 1, testRun.startSeqNo - 1);
         assertBusy(() -> {
             ShardFollowNodeTaskStatus status = task.getStatus();
             assertThat(status.leaderGlobalCheckpoint(), equalTo(testRun.finalExpectedGlobalCheckpoint));
@@ -85,7 +85,6 @@ public class ShardFollowNodeTaskRandomTests extends ESTestCase {
             10240,
             TimeValue.timeValueMillis(10),
             TimeValue.timeValueMillis(10),
-            "uuid",
             Collections.emptyMap()
         );
 
@@ -111,10 +110,10 @@ public class ShardFollowNodeTaskRandomTests extends ESTestCase {
 
             @Override
             protected void innerSendBulkShardOperationsRequest(
-                    List<Translog.Operation> operations,
-                    long maxSeqNoOfUpdates,
-                    Consumer<BulkShardOperationsResponse> handler,
-                    Consumer<Exception> errorHandler) {
+                String followerHistoryUUID, List<Translog.Operation> operations,
+                long maxSeqNoOfUpdates,
+                Consumer<BulkShardOperationsResponse> handler,
+                Consumer<Exception> errorHandler) {
                 for(Translog.Operation op : operations) {
                     tracker.markSeqNoAsCompleted(op.seqNo());
                 }

+ 6 - 7
x-pack/plugin/ccr/src/test/java/org/elasticsearch/xpack/ccr/action/ShardFollowNodeTaskTests.java

@@ -125,7 +125,7 @@ public class ShardFollowNodeTaskTests extends ESTestCase {
 
         shardChangesRequests.clear();
         // The call the updateMapping is a noop, so noting happens.
-        task.start(128L, 128L, task.getStatus().followerGlobalCheckpoint(), task.getStatus().followerMaxSeqNo());
+        task.start("uuid", 128L, 128L, task.getStatus().followerGlobalCheckpoint(), task.getStatus().followerMaxSeqNo());
         task.markAsCompleted();
         task.coordinateReads();
         assertThat(shardChangesRequests.size(), equalTo(0));
@@ -682,7 +682,6 @@ public class ShardFollowNodeTaskTests extends ESTestCase {
             bufferWriteLimit,
             TimeValue.ZERO,
             TimeValue.ZERO,
-            "uuid",
             Collections.emptyMap()
         );
 
@@ -715,10 +714,10 @@ public class ShardFollowNodeTaskTests extends ESTestCase {
 
             @Override
             protected void innerSendBulkShardOperationsRequest(
-                    final List<Translog.Operation> operations,
-                    final long maxSeqNoOfUpdates,
-                    final Consumer<BulkShardOperationsResponse> handler,
-                    final Consumer<Exception> errorHandler) {
+                String followerHistoryUUID, final List<Translog.Operation> operations,
+                final long maxSeqNoOfUpdates,
+                final Consumer<BulkShardOperationsResponse> handler,
+                final Consumer<Exception> errorHandler) {
                 bulkShardOperationRequests.add(operations);
                 Exception writeFailure = ShardFollowNodeTaskTests.this.writeFailures.poll();
                 if (writeFailure != null) {
@@ -796,7 +795,7 @@ public class ShardFollowNodeTaskTests extends ESTestCase {
 
     void startTask(ShardFollowNodeTask task, long leaderGlobalCheckpoint, long followerGlobalCheckpoint) {
         // The call the updateMapping is a noop, so noting happens.
-        task.start(leaderGlobalCheckpoint, leaderGlobalCheckpoint, followerGlobalCheckpoint, followerGlobalCheckpoint);
+        task.start("uuid", leaderGlobalCheckpoint, leaderGlobalCheckpoint, followerGlobalCheckpoint, followerGlobalCheckpoint);
     }
 
 

+ 56 - 11
x-pack/plugin/ccr/src/test/java/org/elasticsearch/xpack/ccr/action/ShardFollowTaskReplicationTests.java

@@ -63,6 +63,7 @@ public class ShardFollowTaskReplicationTests extends ESIndexLevelReplicationTest
             final SeqNoStats leaderSeqNoStats = leaderGroup.getPrimary().seqNoStats();
             final SeqNoStats followerSeqNoStats = followerGroup.getPrimary().seqNoStats();
             shardFollowTask.start(
+                    followerGroup.getPrimary().getHistoryUUID(),
                     leaderSeqNoStats.getGlobalCheckpoint(),
                     leaderSeqNoStats.getMaxSeqNo(),
                     followerSeqNoStats.getGlobalCheckpoint(),
@@ -103,6 +104,7 @@ public class ShardFollowTaskReplicationTests extends ESIndexLevelReplicationTest
             final SeqNoStats leaderSeqNoStats = leaderGroup.getPrimary().seqNoStats();
             final SeqNoStats followerSeqNoStats = followerGroup.getPrimary().seqNoStats();
             shardFollowTask.start(
+                    followerGroup.getPrimary().getHistoryUUID(),
                     leaderSeqNoStats.getGlobalCheckpoint(),
                     leaderSeqNoStats.getMaxSeqNo(),
                     followerSeqNoStats.getGlobalCheckpoint(),
@@ -137,7 +139,7 @@ public class ShardFollowTaskReplicationTests extends ESIndexLevelReplicationTest
         }
     }
 
-    public void testChangeHistoryUUID() throws Exception {
+    public void testChangeLeaderHistoryUUID() throws Exception {
         try (ReplicationGroup leaderGroup = createGroup(0);
              ReplicationGroup followerGroup = createFollowGroup(0)) {
             leaderGroup.startAll();
@@ -148,6 +150,7 @@ public class ShardFollowTaskReplicationTests extends ESIndexLevelReplicationTest
             final SeqNoStats leaderSeqNoStats = leaderGroup.getPrimary().seqNoStats();
             final SeqNoStats followerSeqNoStats = followerGroup.getPrimary().seqNoStats();
             shardFollowTask.start(
+                followerGroup.getPrimary().getHistoryUUID(),
                 leaderSeqNoStats.getGlobalCheckpoint(),
                 leaderSeqNoStats.getMaxSeqNo(),
                 followerSeqNoStats.getGlobalCheckpoint(),
@@ -177,6 +180,47 @@ public class ShardFollowTaskReplicationTests extends ESIndexLevelReplicationTest
         }
     }
 
+    public void testChangeFollowerHistoryUUID() throws Exception {
+        try (ReplicationGroup leaderGroup = createGroup(0);
+             ReplicationGroup followerGroup = createFollowGroup(0)) {
+            leaderGroup.startAll();
+            int docCount = leaderGroup.appendDocs(randomInt(64));
+            leaderGroup.assertAllEqual(docCount);
+            followerGroup.startAll();
+            ShardFollowNodeTask shardFollowTask = createShardFollowTask(leaderGroup, followerGroup);
+            final SeqNoStats leaderSeqNoStats = leaderGroup.getPrimary().seqNoStats();
+            final SeqNoStats followerSeqNoStats = followerGroup.getPrimary().seqNoStats();
+            shardFollowTask.start(
+                followerGroup.getPrimary().getHistoryUUID(),
+                leaderSeqNoStats.getGlobalCheckpoint(),
+                leaderSeqNoStats.getMaxSeqNo(),
+                followerSeqNoStats.getGlobalCheckpoint(),
+                followerSeqNoStats.getMaxSeqNo());
+            leaderGroup.syncGlobalCheckpoint();
+            leaderGroup.assertAllEqual(docCount);
+            Set<String> indexedDocIds = getShardDocUIDs(leaderGroup.getPrimary());
+            assertBusy(() -> {
+                assertThat(followerGroup.getPrimary().getGlobalCheckpoint(), equalTo(leaderGroup.getPrimary().getGlobalCheckpoint()));
+                followerGroup.assertAllEqual(indexedDocIds.size());
+            });
+
+            String oldHistoryUUID = followerGroup.getPrimary().getHistoryUUID();
+            followerGroup.reinitPrimaryShard();
+            followerGroup.getPrimary().store().bootstrapNewHistory();
+            recoverShardFromStore(followerGroup.getPrimary());
+            String newHistoryUUID = followerGroup.getPrimary().getHistoryUUID();
+
+            // force the global checkpoint on the leader to advance
+            leaderGroup.appendDocs(64);
+
+            assertBusy(() -> {
+                assertThat(shardFollowTask.isStopped(), is(true));
+                assertThat(shardFollowTask.getFailure().getMessage(), equalTo("unexpected history uuid, expected [" + oldHistoryUUID +
+                    "], actual [" + newHistoryUUID + "], shard is likely restored from snapshot or force allocated"));
+            });
+        }
+    }
+
     @Override
     protected ReplicationGroup createGroup(int replicas, Settings settings) throws IOException {
         Settings newSettings = Settings.builder().put(IndexMetaData.SETTING_VERSION_CREATED, Version.CURRENT)
@@ -217,9 +261,9 @@ public class ShardFollowTaskReplicationTests extends ESIndexLevelReplicationTest
             between(1, 4), 10240,
             TimeValue.timeValueMillis(10),
             TimeValue.timeValueMillis(10),
-            leaderGroup.getPrimary().getHistoryUUID(),
             Collections.emptyMap()
         );
+        final String recordedLeaderIndexHistoryUUID = leaderGroup.getPrimary().getHistoryUUID();
 
         BiConsumer<TimeValue, Runnable> scheduler = (delay, task) -> threadPool.schedule(delay, ThreadPool.Names.GENERIC, task);
         AtomicBoolean stopped = new AtomicBoolean(false);
@@ -245,13 +289,14 @@ public class ShardFollowTaskReplicationTests extends ESIndexLevelReplicationTest
 
             @Override
             protected void innerSendBulkShardOperationsRequest(
-                    final List<Translog.Operation> operations,
-                    final long maxSeqNoOfUpdates,
-                    final Consumer<BulkShardOperationsResponse> handler,
-                    final Consumer<Exception> errorHandler) {
+                final String followerHistoryUUID,
+                final List<Translog.Operation> operations,
+                final long maxSeqNoOfUpdates,
+                final Consumer<BulkShardOperationsResponse> handler,
+                final Consumer<Exception> errorHandler) {
                 Runnable task = () -> {
-                    BulkShardOperationsRequest request = new BulkShardOperationsRequest(
-                        params.getFollowShardId(), operations, maxSeqNoOfUpdates);
+                    BulkShardOperationsRequest request = new BulkShardOperationsRequest(params.getFollowShardId(),
+                        followerHistoryUUID, operations, maxSeqNoOfUpdates);
                     ActionListener<BulkShardOperationsResponse> listener = ActionListener.wrap(handler::accept, errorHandler);
                     new CCRAction(request, listener, followerGroup).execute();
                 };
@@ -277,7 +322,7 @@ public class ShardFollowTaskReplicationTests extends ESIndexLevelReplicationTest
                                 return;
                             }
                             Translog.Operation[] ops = ShardChangesAction.getOperations(indexShard, seqNoStats.getGlobalCheckpoint(), from,
-                                maxOperationCount, params.getRecordedLeaderIndexHistoryUUID(), params.getMaxBatchSize());
+                                maxOperationCount, recordedLeaderIndexHistoryUUID, params.getMaxBatchSize());
                             // hard code mapping version; this is ok, as mapping updates are not tested here
                             final ShardChangesAction.Response response = new ShardChangesAction.Response(
                                 1L,
@@ -340,8 +385,8 @@ public class ShardFollowTaskReplicationTests extends ESIndexLevelReplicationTest
         @Override
         protected PrimaryResult performOnPrimary(IndexShard primary, BulkShardOperationsRequest request) throws Exception {
             TransportWriteAction.WritePrimaryResult<BulkShardOperationsRequest, BulkShardOperationsResponse> result =
-                TransportBulkShardOperationsAction.shardOperationOnPrimary(primary.shardId(), request.getOperations(),
-                    request.getMaxSeqNoOfUpdatesOrDeletes(), primary, logger);
+                TransportBulkShardOperationsAction.shardOperationOnPrimary(primary.shardId(), request.getHistoryUUID(),
+                    request.getOperations(), request.getMaxSeqNoOfUpdatesOrDeletes(), primary, logger);
             return new PrimaryResult(result.replicaRequest(), result.finalResponseIfSuccessful);
         }
 

+ 0 - 1
x-pack/plugin/ccr/src/test/java/org/elasticsearch/xpack/ccr/action/ShardFollowTaskTests.java

@@ -36,7 +36,6 @@ public class ShardFollowTaskTests extends AbstractSerializingTestCase<ShardFollo
             randomIntBetween(1, Integer.MAX_VALUE),
             TimeValue.parseTimeValue(randomTimeValue(), ""),
             TimeValue.parseTimeValue(randomTimeValue(), ""),
-            randomAlphaOfLength(4),
             randomBoolean() ? null : Collections.singletonMap("key", "value")
         );
     }

+ 0 - 1
x-pack/plugin/ccr/src/test/java/org/elasticsearch/xpack/ccr/action/TransportUnfollowActionTests.java

@@ -83,7 +83,6 @@ public class TransportUnfollowActionTests extends ESTestCase {
             10240,
             TimeValue.timeValueMillis(10),
             TimeValue.timeValueMillis(10),
-            "uuid",
             Collections.emptyMap()
         );
         PersistentTasksCustomMetaData.PersistentTask<?> task =

+ 2 - 1
x-pack/plugin/ccr/src/test/java/org/elasticsearch/xpack/ccr/action/bulk/BulkShardOperationsTests.java

@@ -59,7 +59,8 @@ public class BulkShardOperationsTests extends IndexShardTestCase {
         }
 
         final TransportWriteAction.WritePrimaryResult<BulkShardOperationsRequest, BulkShardOperationsResponse> result =
-            TransportBulkShardOperationsAction.shardOperationOnPrimary(followerPrimary.shardId(), operations,
+            TransportBulkShardOperationsAction.shardOperationOnPrimary(followerPrimary.shardId(), followerPrimary.getHistoryUUID(),
+                    operations,
                 numOps - 1, followerPrimary, logger);
 
         try (Translog.Snapshot snapshot = followerPrimary.getHistoryOperations("test", 0)) {