Browse Source

CCR: replicates max seq_no of updates to follower (#34051)

This commit replicates the max_seq_no_of_updates on the leading index
to the primaries of the following index via ShardFollowNodeTask. The
max_seq_of_updates is then transmitted to the replicas of the follower
via replication requests (that's BulkShardOperationsRequest).

Relates #33656
Nhat Nguyen 7 years ago
parent
commit
48c169e065

+ 20 - 5
x-pack/plugin/ccr/src/main/java/org/elasticsearch/xpack/ccr/action/ShardChangesAction.java

@@ -207,6 +207,12 @@ public class ShardChangesAction extends Action<ShardChangesAction.Response> {
             return maxSeqNo;
         }
 
+        private long maxSeqNoOfUpdatesOrDeletes;
+
+        public long getMaxSeqNoOfUpdatesOrDeletes() {
+            return maxSeqNoOfUpdatesOrDeletes;
+        }
+
         private Translog.Operation[] operations;
 
         public Translog.Operation[] getOperations() {
@@ -220,11 +226,13 @@ public class ShardChangesAction extends Action<ShardChangesAction.Response> {
             final long mappingVersion,
             final long globalCheckpoint,
             final long maxSeqNo,
+            final long maxSeqNoOfUpdatesOrDeletes,
             final Translog.Operation[] operations) {
 
             this.mappingVersion = mappingVersion;
             this.globalCheckpoint = globalCheckpoint;
             this.maxSeqNo = maxSeqNo;
+            this.maxSeqNoOfUpdatesOrDeletes = maxSeqNoOfUpdatesOrDeletes;
             this.operations = operations;
         }
 
@@ -234,6 +242,7 @@ public class ShardChangesAction extends Action<ShardChangesAction.Response> {
             mappingVersion = in.readVLong();
             globalCheckpoint = in.readZLong();
             maxSeqNo = in.readZLong();
+            maxSeqNoOfUpdatesOrDeletes = in.readZLong();
             operations = in.readArray(Translog.Operation::readOperation, Translog.Operation[]::new);
         }
 
@@ -243,6 +252,7 @@ public class ShardChangesAction extends Action<ShardChangesAction.Response> {
             out.writeVLong(mappingVersion);
             out.writeZLong(globalCheckpoint);
             out.writeZLong(maxSeqNo);
+            out.writeZLong(maxSeqNoOfUpdatesOrDeletes);
             out.writeArray(Translog.Operation::writeOperation, operations);
         }
 
@@ -254,12 +264,13 @@ public class ShardChangesAction extends Action<ShardChangesAction.Response> {
             return mappingVersion == that.mappingVersion &&
                     globalCheckpoint == that.globalCheckpoint &&
                     maxSeqNo == that.maxSeqNo &&
+                    maxSeqNoOfUpdatesOrDeletes == that.maxSeqNoOfUpdatesOrDeletes &&
                     Arrays.equals(operations, that.operations);
         }
 
         @Override
         public int hashCode() {
-            return Objects.hash(mappingVersion, globalCheckpoint, maxSeqNo, Arrays.hashCode(operations));
+            return Objects.hash(mappingVersion, globalCheckpoint, maxSeqNo, maxSeqNoOfUpdatesOrDeletes, Arrays.hashCode(operations));
         }
     }
 
@@ -294,7 +305,9 @@ public class ShardChangesAction extends Action<ShardChangesAction.Response> {
                     request.getMaxOperationCount(),
                     request.getExpectedHistoryUUID(),
                     request.getMaxOperationSizeInBytes());
-            return getResponse(mappingVersion, seqNoStats, operations);
+            // must capture after after snapshotting operations to ensure this MUS is at least the highest MUS of any of these operations.
+            final long maxSeqNoOfUpdatesOrDeletes = indexShard.getMaxSeqNoOfUpdatesOrDeletes();
+            return getResponse(mappingVersion, seqNoStats, maxSeqNoOfUpdatesOrDeletes, operations);
         }
 
         @Override
@@ -358,7 +371,8 @@ public class ShardChangesAction extends Action<ShardChangesAction.Response> {
                     final long mappingVersion =
                             clusterService.state().metaData().index(shardId.getIndex()).getMappingVersion();
                     final SeqNoStats latestSeqNoStats = indexShard.seqNoStats();
-                    listener.onResponse(getResponse(mappingVersion, latestSeqNoStats, EMPTY_OPERATIONS_ARRAY));
+                    final long maxSeqNoOfUpdatesOrDeletes = indexShard.getMaxSeqNoOfUpdatesOrDeletes();
+                    listener.onResponse(getResponse(mappingVersion, latestSeqNoStats, maxSeqNoOfUpdatesOrDeletes, EMPTY_OPERATIONS_ARRAY));
                 } catch (final Exception caught) {
                     caught.addSuppressed(e);
                     listener.onFailure(caught);
@@ -433,8 +447,9 @@ public class ShardChangesAction extends Action<ShardChangesAction.Response> {
         return operations.toArray(EMPTY_OPERATIONS_ARRAY);
     }
 
-    static Response getResponse(final long mappingVersion, final SeqNoStats seqNoStats, final Translog.Operation[] operations) {
-        return new Response(mappingVersion, seqNoStats.getGlobalCheckpoint(), seqNoStats.getMaxSeqNo(), operations);
+    static Response getResponse(final long mappingVersion, final SeqNoStats seqNoStats,
+                                final long maxSeqNoOfUpdates, final Translog.Operation[] operations) {
+        return new Response(mappingVersion, seqNoStats.getGlobalCheckpoint(), seqNoStats.getMaxSeqNo(), maxSeqNoOfUpdates, operations);
     }
 
 }

+ 12 - 10
x-pack/plugin/ccr/src/main/java/org/elasticsearch/xpack/ccr/action/ShardFollowNodeTask.java

@@ -16,6 +16,7 @@ import org.elasticsearch.common.collect.Tuple;
 import org.elasticsearch.common.logging.Loggers;
 import org.elasticsearch.common.transport.NetworkExceptionHelper;
 import org.elasticsearch.common.unit.TimeValue;
+import org.elasticsearch.index.seqno.SequenceNumbers;
 import org.elasticsearch.index.shard.ShardId;
 import org.elasticsearch.index.translog.Translog;
 import org.elasticsearch.persistent.AllocatedPersistentTask;
@@ -56,6 +57,7 @@ public abstract class ShardFollowNodeTask extends AllocatedPersistentTask {
 
     private long leaderGlobalCheckpoint;
     private long leaderMaxSeqNo;
+    private long leaderMaxSeqNoOfUpdatesOrDeletes = SequenceNumbers.UNASSIGNED_SEQ_NO;
     private long lastRequestedSeqNo;
     private long followerGlobalCheckpoint = 0;
     private long followerMaxSeqNo = 0;
@@ -201,7 +203,7 @@ public abstract class ShardFollowNodeTask extends AllocatedPersistentTask {
             numConcurrentWrites++;
             LOGGER.trace("{}[{}] write [{}/{}] [{}]", params.getFollowShardId(), numConcurrentWrites, ops.get(0).seqNo(),
                 ops.get(ops.size() - 1).seqNo(), ops.size());
-            sendBulkShardOperationsRequest(ops);
+            sendBulkShardOperationsRequest(ops, leaderMaxSeqNoOfUpdatesOrDeletes, new AtomicInteger(0));
         }
     }
 
@@ -262,6 +264,7 @@ public abstract class ShardFollowNodeTask extends AllocatedPersistentTask {
         onOperationsFetched(response.getOperations());
         leaderGlobalCheckpoint = Math.max(leaderGlobalCheckpoint, response.getGlobalCheckpoint());
         leaderMaxSeqNo = Math.max(leaderMaxSeqNo, response.getMaxSeqNo());
+        leaderMaxSeqNoOfUpdatesOrDeletes = SequenceNumbers.max(leaderMaxSeqNoOfUpdatesOrDeletes, response.getMaxSeqNoOfUpdatesOrDeletes());
         final long newFromSeqNo;
         if (response.getOperations().length == 0) {
             newFromSeqNo = from;
@@ -291,13 +294,11 @@ public abstract class ShardFollowNodeTask extends AllocatedPersistentTask {
         }
     }
 
-    private void sendBulkShardOperationsRequest(List<Translog.Operation> operations) {
-        sendBulkShardOperationsRequest(operations, new AtomicInteger(0));
-    }
-
-    private void sendBulkShardOperationsRequest(List<Translog.Operation> operations, AtomicInteger retryCounter) {
+    private void sendBulkShardOperationsRequest(List<Translog.Operation> operations, long leaderMaxSeqNoOfUpdatesOrDeletes,
+                                                AtomicInteger retryCounter) {
+        assert leaderMaxSeqNoOfUpdatesOrDeletes != SequenceNumbers.UNASSIGNED_SEQ_NO : "mus is not replicated";
         final long startTime = relativeTimeProvider.getAsLong();
-        innerSendBulkShardOperationsRequest(operations,
+        innerSendBulkShardOperationsRequest(operations, leaderMaxSeqNoOfUpdatesOrDeletes,
                 response -> {
                     synchronized (ShardFollowNodeTask.this) {
                         totalIndexTimeMillis += TimeUnit.NANOSECONDS.toMillis(relativeTimeProvider.getAsLong() - startTime);
@@ -311,7 +312,8 @@ public abstract class ShardFollowNodeTask extends AllocatedPersistentTask {
                         totalIndexTimeMillis += TimeUnit.NANOSECONDS.toMillis(relativeTimeProvider.getAsLong() - startTime);
                         numberOfFailedBulkOperations++;
                     }
-                    handleFailure(e, retryCounter, () -> sendBulkShardOperationsRequest(operations, retryCounter));
+                    handleFailure(e, retryCounter,
+                        () -> sendBulkShardOperationsRequest(operations, leaderMaxSeqNoOfUpdatesOrDeletes, retryCounter));
                 }
         );
     }
@@ -383,8 +385,8 @@ public abstract class ShardFollowNodeTask extends AllocatedPersistentTask {
     // These methods are protected for testing purposes:
     protected abstract void innerUpdateMapping(LongConsumer handler, Consumer<Exception> errorHandler);
 
-    protected abstract void innerSendBulkShardOperationsRequest(
-            List<Translog.Operation> operations, Consumer<BulkShardOperationsResponse> handler, Consumer<Exception> errorHandler);
+    protected abstract void innerSendBulkShardOperationsRequest(List<Translog.Operation> operations, long leaderMaxSeqNoOfUpdatesOrDeletes,
+                                    Consumer<BulkShardOperationsResponse> handler, Consumer<Exception> errorHandler);
 
     protected abstract void innerSendShardChangesRequest(long from, int maxOperationCount, Consumer<ShardChangesAction.Response> handler,
                                                          Consumer<Exception> errorHandler);

+ 3 - 1
x-pack/plugin/ccr/src/main/java/org/elasticsearch/xpack/ccr/action/ShardFollowTasksExecutor.java

@@ -133,9 +133,11 @@ public class ShardFollowTasksExecutor extends PersistentTasksExecutor<ShardFollo
             @Override
             protected void innerSendBulkShardOperationsRequest(
                     final List<Translog.Operation> operations,
+                    final long maxSeqNoOfUpdatesOrDeletes,
                     final Consumer<BulkShardOperationsResponse> handler,
                     final Consumer<Exception> errorHandler) {
-                final BulkShardOperationsRequest request = new BulkShardOperationsRequest(params.getFollowShardId(), operations);
+                final BulkShardOperationsRequest request = new BulkShardOperationsRequest(
+                    params.getFollowShardId(), operations, maxSeqNoOfUpdatesOrDeletes);
                 followerClient.execute(BulkShardOperationsAction.INSTANCE, request,
                     ActionListener.wrap(response -> handler.accept(response), errorHandler));
             }

+ 10 - 1
x-pack/plugin/ccr/src/main/java/org/elasticsearch/xpack/ccr/action/bulk/BulkShardOperationsRequest.java

@@ -17,29 +17,37 @@ import java.util.List;
 public final class BulkShardOperationsRequest extends ReplicatedWriteRequest<BulkShardOperationsRequest> {
 
     private List<Translog.Operation> operations;
+    private long maxSeqNoOfUpdatesOrDeletes;
 
     public BulkShardOperationsRequest() {
     }
 
-    public BulkShardOperationsRequest(final ShardId shardId, final List<Translog.Operation> operations) {
+    public BulkShardOperationsRequest(ShardId shardId, List<Translog.Operation> operations, long maxSeqNoOfUpdatesOrDeletes) {
         super(shardId);
         setRefreshPolicy(RefreshPolicy.NONE);
         this.operations = operations;
+        this.maxSeqNoOfUpdatesOrDeletes = maxSeqNoOfUpdatesOrDeletes;
     }
 
     public List<Translog.Operation> getOperations() {
         return operations;
     }
 
+    public long getMaxSeqNoOfUpdatesOrDeletes() {
+        return maxSeqNoOfUpdatesOrDeletes;
+    }
+
     @Override
     public void readFrom(final StreamInput in) throws IOException {
         super.readFrom(in);
+        maxSeqNoOfUpdatesOrDeletes = in.readZLong();
         operations = in.readList(Translog.Operation::readOperation);
     }
 
     @Override
     public void writeTo(final StreamOutput out) throws IOException {
         super.writeTo(out);
+        out.writeZLong(maxSeqNoOfUpdatesOrDeletes);
         out.writeVInt(operations.size());
         for (Translog.Operation operation : operations) {
             Translog.Operation.writeOperation(out, operation);
@@ -50,6 +58,7 @@ public final class BulkShardOperationsRequest extends ReplicatedWriteRequest<Bul
     public String toString() {
         return "BulkShardOperationsRequest{" +
                 "operations=" + operations.size()+
+                ", maxSeqNoUpdates=" + maxSeqNoOfUpdatesOrDeletes +
                 ", shardId=" + shardId +
                 ", timeout=" + timeout +
                 ", index='" + index + '\'' +

+ 10 - 4
x-pack/plugin/ccr/src/main/java/org/elasticsearch/xpack/ccr/action/bulk/TransportBulkShardOperationsAction.java

@@ -18,6 +18,7 @@ import org.elasticsearch.common.inject.Inject;
 import org.elasticsearch.common.settings.Settings;
 import org.elasticsearch.index.engine.Engine;
 import org.elasticsearch.index.seqno.SeqNoStats;
+import org.elasticsearch.index.seqno.SequenceNumbers;
 import org.elasticsearch.index.shard.IndexShard;
 import org.elasticsearch.index.shard.ShardId;
 import org.elasticsearch.index.translog.Translog;
@@ -60,13 +61,15 @@ public class TransportBulkShardOperationsAction
     @Override
     protected WritePrimaryResult<BulkShardOperationsRequest, BulkShardOperationsResponse> shardOperationOnPrimary(
             final BulkShardOperationsRequest request, final IndexShard primary) throws Exception {
-        return shardOperationOnPrimary(request.shardId(), request.getOperations(), primary, logger);
+        return shardOperationOnPrimary(
+            request.shardId(), request.getOperations(), request.getMaxSeqNoOfUpdatesOrDeletes(), primary, logger);
     }
 
     // public for testing purposes only
     public static WritePrimaryResult<BulkShardOperationsRequest, BulkShardOperationsResponse> shardOperationOnPrimary(
             final ShardId shardId,
             final List<Translog.Operation> sourceOperations,
+            final long maxSeqNoOfUpdatesOrDeletes,
             final IndexShard primary,
             final Logger logger) throws IOException {
         final List<Translog.Operation> targetOperations = sourceOperations.stream().map(operation -> {
@@ -103,16 +106,19 @@ public class TransportBulkShardOperationsAction
             }
             return operationWithPrimaryTerm;
         }).collect(Collectors.toList());
-        // TODO: Replace this artificial value by the actual max_seq_no_updates from the leader
-        targetOperations.stream().mapToLong(Translog.Operation::seqNo).max().ifPresent(primary::advanceMaxSeqNoOfUpdatesOrDeletes);
+        assert maxSeqNoOfUpdatesOrDeletes >= SequenceNumbers.NO_OPS_PERFORMED : "invalid msu [" + maxSeqNoOfUpdatesOrDeletes + "]";
+        primary.advanceMaxSeqNoOfUpdatesOrDeletes(maxSeqNoOfUpdatesOrDeletes);
         final Translog.Location location = applyTranslogOperations(targetOperations, primary, Engine.Operation.Origin.PRIMARY);
-        final BulkShardOperationsRequest replicaRequest = new BulkShardOperationsRequest(shardId, targetOperations);
+        final BulkShardOperationsRequest replicaRequest = new BulkShardOperationsRequest(
+            shardId, targetOperations, maxSeqNoOfUpdatesOrDeletes);
         return new CcrWritePrimaryResult(replicaRequest, location, primary, logger);
     }
 
     @Override
     protected WriteReplicaResult<BulkShardOperationsRequest> shardOperationOnReplica(
             final BulkShardOperationsRequest request, final IndexShard replica) throws Exception {
+        assert replica.getMaxSeqNoOfUpdatesOrDeletes() >= request.getMaxSeqNoOfUpdatesOrDeletes() :
+            "mus on replica [" + replica + "] < mus of request [" + request.getMaxSeqNoOfUpdatesOrDeletes() + "]";
         final Translog.Location location = applyTranslogOperations(request.getOperations(), replica, Engine.Operation.Origin.REPLICA);
         return new WriteReplicaResult<>(request, location, null, replica, logger);
     }

+ 52 - 1
x-pack/plugin/ccr/src/test/java/org/elasticsearch/xpack/ccr/ShardChangesIT.java

@@ -6,6 +6,7 @@
 
 package org.elasticsearch.xpack.ccr;
 
+import org.apache.lucene.store.AlreadyClosedException;
 import org.elasticsearch.action.admin.cluster.node.tasks.list.ListTasksAction;
 import org.elasticsearch.action.admin.cluster.node.tasks.list.ListTasksRequest;
 import org.elasticsearch.action.admin.cluster.node.tasks.list.ListTasksResponse;
@@ -30,11 +31,15 @@ import org.elasticsearch.common.unit.TimeValue;
 import org.elasticsearch.common.xcontent.XContentBuilder;
 import org.elasticsearch.common.xcontent.XContentType;
 import org.elasticsearch.common.xcontent.support.XContentMapValues;
+import org.elasticsearch.index.Index;
 import org.elasticsearch.index.IndexNotFoundException;
 import org.elasticsearch.index.IndexSettings;
 import org.elasticsearch.index.engine.Engine;
+import org.elasticsearch.index.seqno.SequenceNumbers;
+import org.elasticsearch.index.shard.IndexShard;
 import org.elasticsearch.index.shard.ShardId;
 import org.elasticsearch.index.translog.Translog;
+import org.elasticsearch.indices.IndicesService;
 import org.elasticsearch.persistent.PersistentTasksCustomMetaData;
 import org.elasticsearch.plugins.Plugin;
 import org.elasticsearch.search.builder.SearchSourceBuilder;
@@ -59,6 +64,7 @@ import java.util.List;
 import java.util.Locale;
 import java.util.Map;
 import java.util.Objects;
+import java.util.Set;
 import java.util.concurrent.TimeUnit;
 import java.util.concurrent.atomic.AtomicBoolean;
 
@@ -221,6 +227,7 @@ public class ShardChangesIT extends ESIntegTestCase {
             assertBusy(assertExpectedDocumentRunnable(i));
         }
         unfollowIndex("index2");
+        assertMaxSeqNoOfUpdatesIsTransferred(resolveIndex("index1"), resolveIndex("index2"), numberOfPrimaryShards);
     }
 
     public void testSyncMappings() throws Exception {
@@ -258,6 +265,7 @@ public class ShardChangesIT extends ESIntegTestCase {
         assertThat(XContentMapValues.extractValue("properties.f.type", mappingMetaData.sourceAsMap()), equalTo("integer"));
         assertThat(XContentMapValues.extractValue("properties.k.type", mappingMetaData.sourceAsMap()), equalTo("long"));
         unfollowIndex("index2");
+        assertMaxSeqNoOfUpdatesIsTransferred(resolveIndex("index1"), resolveIndex("index2"), 2);
     }
 
     public void testNoMappingDefined() throws Exception {
@@ -284,7 +292,8 @@ public class ShardChangesIT extends ESIntegTestCase {
     }
 
     public void testFollowIndex_backlog() throws Exception {
-        String leaderIndexSettings = getIndexSettings(between(1, 5), between(0, 1),
+        int numberOfShards = between(1, 5);
+        String leaderIndexSettings = getIndexSettings(numberOfShards, between(0, 1),
             singletonMap(IndexSettings.INDEX_SOFT_DELETES_SETTING.getKey(), "true"));
         assertAcked(client().admin().indices().prepareCreate("index1").setSource(leaderIndexSettings, XContentType.JSON));
         BulkProcessor.Listener listener = new BulkProcessor.Listener() {
@@ -334,6 +343,7 @@ public class ShardChangesIT extends ESIntegTestCase {
 
         assertSameDocCount("index1", "index2");
         unfollowIndex("index2");
+        assertMaxSeqNoOfUpdatesIsTransferred(resolveIndex("index1"), resolveIndex("index2"), numberOfShards);
     }
 
     @AwaitsFix(bugUrl = "https://github.com/elastic/elasticsearch/issues/33337")
@@ -379,6 +389,7 @@ public class ShardChangesIT extends ESIntegTestCase {
 
         assertSameDocCount("index1", "index2");
         unfollowIndex("index2");
+        assertMaxSeqNoOfUpdatesIsTransferred(resolveIndex("index1"), resolveIndex("index2"), 3);
     }
 
     public void testFollowIndexWithNestedField() throws Exception {
@@ -419,6 +430,7 @@ public class ShardChangesIT extends ESIntegTestCase {
             });
         }
         unfollowIndex("index2");
+        assertMaxSeqNoOfUpdatesIsTransferred(resolveIndex("index1"), resolveIndex("index2"), 1);
     }
 
     public void testUnfollowNonExistingIndex() {
@@ -482,6 +494,7 @@ public class ShardChangesIT extends ESIntegTestCase {
             assertBusy(assertExpectedDocumentRunnable(i));
         }
         unfollowIndex("index2");
+        assertMaxSeqNoOfUpdatesIsTransferred(resolveIndex("index1"), resolveIndex("index2"), 1);
     }
 
     public void testDontFollowTheWrongIndex() throws Exception {
@@ -715,6 +728,44 @@ public class ShardChangesIT extends ESIntegTestCase {
         }, 60, TimeUnit.SECONDS);
     }
 
+    private void assertMaxSeqNoOfUpdatesIsTransferred(Index leaderIndex, Index followerIndex, int numberOfShards) throws Exception {
+        assertBusy(() -> {
+            long[] msuOnLeader = new long[numberOfShards];
+            for (int i = 0; i < msuOnLeader.length; i++) {
+                msuOnLeader[i] = SequenceNumbers.UNASSIGNED_SEQ_NO;
+            }
+            Set<String> leaderNodes = internalCluster().nodesInclude(leaderIndex.getName());
+            for (String leaderNode : leaderNodes) {
+                IndicesService indicesService = internalCluster().getInstance(IndicesService.class, leaderNode);
+                for (int i = 0; i < numberOfShards; i++) {
+                    IndexShard shard = indicesService.getShardOrNull(new ShardId(leaderIndex, i));
+                    if (shard != null) {
+                        try {
+                            msuOnLeader[i] = SequenceNumbers.max(msuOnLeader[i], shard.getMaxSeqNoOfUpdatesOrDeletes());
+                        } catch (AlreadyClosedException ignored) {
+                            return;
+                        }
+                    }
+                }
+            }
+
+            Set<String> followerNodes = internalCluster().nodesInclude(followerIndex.getName());
+            for (String followerNode : followerNodes) {
+                IndicesService indicesService = internalCluster().getInstance(IndicesService.class, followerNode);
+                for (int i = 0; i < numberOfShards; i++) {
+                    IndexShard shard = indicesService.getShardOrNull(new ShardId(leaderIndex, i));
+                    if (shard != null) {
+                        try {
+                            assertThat(shard.getMaxSeqNoOfUpdatesOrDeletes(), equalTo(msuOnLeader[i]));
+                        } catch (AlreadyClosedException ignored) {
+
+                        }
+                    }
+                }
+            }
+        });
+    }
+
     public static FollowIndexAction.Request createFollowRequest(String leaderIndex, String followerIndex) {
         FollowIndexAction.Request request = new FollowIndexAction.Request();
         request.setLeaderIndex(leaderIndex);

+ 2 - 0
x-pack/plugin/ccr/src/test/java/org/elasticsearch/xpack/ccr/action/ShardChangesResponseTests.java

@@ -15,6 +15,7 @@ public class ShardChangesResponseTests extends AbstractStreamableTestCase<ShardC
         final long mappingVersion = randomNonNegativeLong();
         final long leaderGlobalCheckpoint = randomNonNegativeLong();
         final long leaderMaxSeqNo = randomLongBetween(leaderGlobalCheckpoint, Long.MAX_VALUE);
+        final long maxSeqNoOfUpdatesOrDeletes = randomLongBetween(-1, Long.MAX_VALUE);
         final int numOps = randomInt(8);
         final Translog.Operation[] operations = new Translog.Operation[numOps];
         for (int i = 0; i < numOps; i++) {
@@ -24,6 +25,7 @@ public class ShardChangesResponseTests extends AbstractStreamableTestCase<ShardC
             mappingVersion,
             leaderGlobalCheckpoint,
             leaderMaxSeqNo,
+            maxSeqNoOfUpdatesOrDeletes,
             operations
         );
     }

+ 6 - 1
x-pack/plugin/ccr/src/test/java/org/elasticsearch/xpack/ccr/action/ShardFollowNodeTaskRandomTests.java

@@ -112,6 +112,7 @@ public class ShardFollowNodeTaskRandomTests extends ESTestCase {
             @Override
             protected void innerSendBulkShardOperationsRequest(
                     List<Translog.Operation> operations,
+                    long maxSeqNoOfUpdates,
                     Consumer<BulkShardOperationsResponse> handler,
                     Consumer<Exception> errorHandler) {
                 for(Translog.Operation op : operations) {
@@ -157,7 +158,8 @@ public class ShardFollowNodeTaskRandomTests extends ESTestCase {
                         assert from >= testRun.finalExpectedGlobalCheckpoint;
                         final long globalCheckpoint = tracker.getCheckpoint();
                         final long maxSeqNo = tracker.getMaxSeqNo();
-                        handler.accept(new ShardChangesAction.Response(0L,globalCheckpoint, maxSeqNo, new Translog.Operation[0]));
+                        handler.accept(new ShardChangesAction.Response(
+                            0L, globalCheckpoint, maxSeqNo, randomNonNegativeLong(), new Translog.Operation[0]));
                     }
                 };
                 threadPool.generic().execute(task);
@@ -231,6 +233,7 @@ public class ShardFollowNodeTaskRandomTests extends ESTestCase {
                         mappingVersion,
                         nextGlobalCheckPoint,
                         nextGlobalCheckPoint,
+                        randomNonNegativeLong(),
                         ops.toArray(EMPTY))
                     )
                 );
@@ -253,6 +256,7 @@ public class ShardFollowNodeTaskRandomTests extends ESTestCase {
                             mappingVersion,
                             prevGlobalCheckpoint,
                             prevGlobalCheckpoint,
+                            randomNonNegativeLong(),
                             EMPTY
                         );
                         item.add(new TestResponse(null, mappingVersion, response));
@@ -269,6 +273,7 @@ public class ShardFollowNodeTaskRandomTests extends ESTestCase {
                         mappingVersion,
                         localLeaderGCP,
                         localLeaderGCP,
+                        randomNonNegativeLong(),
                         ops.toArray(EMPTY)
                     );
                     item.add(new TestResponse(null, mappingVersion, response));

+ 4 - 1
x-pack/plugin/ccr/src/test/java/org/elasticsearch/xpack/ccr/action/ShardFollowNodeTaskTests.java

@@ -407,7 +407,7 @@ public class ShardFollowNodeTaskTests extends ESTestCase {
         assertThat(shardChangesRequests.get(0)[1], equalTo(64L));
 
         shardChangesRequests.clear();
-        task.innerHandleReadResponse(0L, 63L, new ShardChangesAction.Response(0, 0, 0, new Translog.Operation[0]));
+        task.innerHandleReadResponse(0L, 63L, new ShardChangesAction.Response(0, 0, 0, 100, new Translog.Operation[0]));
 
         assertThat(shardChangesRequests.size(), equalTo(1));
         assertThat(shardChangesRequests.get(0)[0], equalTo(0L));
@@ -714,6 +714,7 @@ public class ShardFollowNodeTaskTests extends ESTestCase {
             @Override
             protected void innerSendBulkShardOperationsRequest(
                     final List<Translog.Operation> operations,
+                    final long maxSeqNoOfUpdates,
                     final Consumer<BulkShardOperationsResponse> handler,
                     final Consumer<Exception> errorHandler) {
                 bulkShardOperationRequests.add(operations);
@@ -749,6 +750,7 @@ public class ShardFollowNodeTaskTests extends ESTestCase {
                         mappingVersions.poll(),
                         leaderGlobalCheckpoints.poll(),
                         maxSeqNos.poll(),
+                        randomNonNegativeLong(),
                         operations
                     );
                     handler.accept(response);
@@ -785,6 +787,7 @@ public class ShardFollowNodeTaskTests extends ESTestCase {
             mappingVersion,
             leaderGlobalCheckPoint,
             leaderGlobalCheckPoint,
+            randomNonNegativeLong(),
             ops.toArray(new Translog.Operation[0])
         );
     }

+ 11 - 3
x-pack/plugin/ccr/src/test/java/org/elasticsearch/xpack/ccr/action/ShardFollowTaskReplicationTests.java

@@ -240,10 +240,12 @@ public class ShardFollowTaskReplicationTests extends ESIndexLevelReplicationTest
             @Override
             protected void innerSendBulkShardOperationsRequest(
                     final List<Translog.Operation> operations,
+                    final long maxSeqNoOfUpdates,
                     final Consumer<BulkShardOperationsResponse> handler,
                     final Consumer<Exception> errorHandler) {
                 Runnable task = () -> {
-                    BulkShardOperationsRequest request = new BulkShardOperationsRequest(params.getFollowShardId(), operations);
+                    BulkShardOperationsRequest request = new BulkShardOperationsRequest(
+                        params.getFollowShardId(), operations, maxSeqNoOfUpdates);
                     ActionListener<BulkShardOperationsResponse> listener = ActionListener.wrap(handler::accept, errorHandler);
                     new CCRAction(request, listener, followerGroup).execute();
                 };
@@ -262,8 +264,10 @@ public class ShardFollowTaskReplicationTests extends ESIndexLevelReplicationTest
                     for (IndexShard indexShard : indexShards) {
                         try {
                             final SeqNoStats seqNoStats = indexShard.seqNoStats();
+                            final long maxSeqNoOfUpdatesOrDeletes = indexShard.getMaxSeqNoOfUpdatesOrDeletes();
                             if (from > seqNoStats.getGlobalCheckpoint()) {
-                                handler.accept(ShardChangesAction.getResponse(1L, seqNoStats, ShardChangesAction.EMPTY_OPERATIONS_ARRAY));
+                                handler.accept(ShardChangesAction.getResponse(1L, seqNoStats,
+                                    maxSeqNoOfUpdatesOrDeletes, ShardChangesAction.EMPTY_OPERATIONS_ARRAY));
                                 return;
                             }
                             Translog.Operation[] ops = ShardChangesAction.getOperations(indexShard, seqNoStats.getGlobalCheckpoint(), from,
@@ -273,6 +277,7 @@ public class ShardFollowTaskReplicationTests extends ESIndexLevelReplicationTest
                                 1L,
                                 seqNoStats.getGlobalCheckpoint(),
                                 seqNoStats.getMaxSeqNo(),
+                                maxSeqNoOfUpdatesOrDeletes,
                                 ops
                             );
                             handler.accept(response);
@@ -315,6 +320,9 @@ public class ShardFollowTaskReplicationTests extends ESIndexLevelReplicationTest
         for (IndexShard followingShard : follower) {
             assertThat(followingShard.estimateNumberOfHistoryOperations("test", 0), equalTo(totalOps));
         }
+        for (IndexShard followingShard : follower) {
+            assertThat(followingShard.getMaxSeqNoOfUpdatesOrDeletes(), equalTo(leader.getPrimary().getMaxSeqNoOfUpdatesOrDeletes()));
+        }
     }
 
     class CCRAction extends ReplicationAction<BulkShardOperationsRequest, BulkShardOperationsRequest, BulkShardOperationsResponse> {
@@ -327,7 +335,7 @@ public class ShardFollowTaskReplicationTests extends ESIndexLevelReplicationTest
         protected PrimaryResult performOnPrimary(IndexShard primary, BulkShardOperationsRequest request) throws Exception {
             TransportWriteAction.WritePrimaryResult<BulkShardOperationsRequest, BulkShardOperationsResponse> result =
                 TransportBulkShardOperationsAction.shardOperationOnPrimary(primary.shardId(), request.getOperations(),
-                    primary, logger);
+                    request.getMaxSeqNoOfUpdatesOrDeletes(), primary, logger);
             return new PrimaryResult(result.replicaRequest(), result.finalResponseIfSuccessful);
         }
 

+ 2 - 1
x-pack/plugin/ccr/src/test/java/org/elasticsearch/xpack/ccr/action/bulk/BulkShardOperationsTests.java

@@ -59,7 +59,8 @@ public class BulkShardOperationsTests extends IndexShardTestCase {
         }
 
         final TransportWriteAction.WritePrimaryResult<BulkShardOperationsRequest, BulkShardOperationsResponse> result =
-                TransportBulkShardOperationsAction.shardOperationOnPrimary(followerPrimary.shardId(), operations, followerPrimary, logger);
+            TransportBulkShardOperationsAction.shardOperationOnPrimary(followerPrimary.shardId(), operations,
+                numOps - 1, followerPrimary, logger);
 
         try (Translog.Snapshot snapshot = followerPrimary.getHistoryOperations("test", 0)) {
             assertThat(snapshot.totalOperations(), equalTo(operations.size()));