Browse Source

Replicate writes only to fully initialized shards (#28049)

The primary currently replicates writes to all other shard copies as soon as they're added to the routing table. Initially those shards are not even ready yet to receive these replication requests, for example when undergoing a file-based peer recovery. Based on the specific stage that the shard copies are in, they will throw different kinds of exceptions when they receive the replication requests. The primary then ignores responses from shards that match certain exception types. With this mechanism it's not possible for a primary to distinguish between a situation where a replication target shard is not allocated and ready yet to receive requests and a situation where the shard was successfully allocated and active but subsequently failed.
This commit changes replication so that only initializing shards that have successfully opened their engine are used as replication targets. This removes the need to replicate requests to initializing shards that are not even ready yet to receive those requests. This saves on network bandwidth and enables features that rely on the distinction between a "not-yet-ready" shard and a failed shard.
Yannick Welsch 7 năm trước cách đây
mục cha
commit
031415a5f6
20 tập tin đã thay đổi với 366 bổ sung193 xóa
  1. 12 20
      server/src/main/java/org/elasticsearch/action/support/replication/ReplicationOperation.java
  2. 50 15
      server/src/main/java/org/elasticsearch/index/seqno/ReplicationTracker.java
  3. 34 34
      server/src/main/java/org/elasticsearch/index/shard/IndexShard.java
  4. 66 2
      server/src/main/java/org/elasticsearch/index/shard/ReplicationGroup.java
  5. 2 2
      server/src/main/java/org/elasticsearch/indices/cluster/IndicesClusterStateService.java
  6. 5 5
      server/src/main/java/org/elasticsearch/indices/recovery/RecoveryHandoffPrimaryContextRequest.java
  7. 8 2
      server/src/main/java/org/elasticsearch/indices/recovery/RecoverySourceHandler.java
  8. 2 2
      server/src/main/java/org/elasticsearch/indices/recovery/RecoveryTarget.java
  9. 2 2
      server/src/main/java/org/elasticsearch/indices/recovery/RecoveryTargetHandler.java
  10. 2 2
      server/src/main/java/org/elasticsearch/indices/recovery/RemoteRecoveryTargetHandler.java
  11. 110 44
      server/src/test/java/org/elasticsearch/action/support/replication/ReplicationOperationTests.java
  12. 7 2
      server/src/test/java/org/elasticsearch/action/support/replication/TransportReplicationActionTests.java
  13. 2 2
      server/src/test/java/org/elasticsearch/index/engine/InternalEngineTests.java
  14. 1 2
      server/src/test/java/org/elasticsearch/index/replication/ESIndexLevelReplicationTestCase.java
  15. 1 1
      server/src/test/java/org/elasticsearch/index/replication/RecoveryDuringReplicationTests.java
  16. 50 43
      server/src/test/java/org/elasticsearch/index/seqno/ReplicationTrackerTests.java
  17. 7 7
      server/src/test/java/org/elasticsearch/index/shard/IndexShardTests.java
  18. 0 1
      server/src/test/java/org/elasticsearch/index/shard/RefreshListenersTests.java
  19. 2 2
      test/framework/src/main/java/org/elasticsearch/index/engine/EngineTestCase.java
  20. 3 3
      test/framework/src/main/java/org/elasticsearch/index/shard/IndexShardTestCase.java

+ 12 - 20
server/src/main/java/org/elasticsearch/action/support/replication/ReplicationOperation.java

@@ -117,19 +117,17 @@ public class ReplicationOperation<
             // This would entail that some shards could learn about a global checkpoint that would be higher than its local checkpoint.
             final long globalCheckpoint = primary.globalCheckpoint();
             final ReplicationGroup replicationGroup = primary.getReplicationGroup();
-            markUnavailableShardsAsStale(replicaRequest, replicationGroup.getInSyncAllocationIds(), replicationGroup.getRoutingTable());
-            performOnReplicas(replicaRequest, globalCheckpoint, replicationGroup.getRoutingTable());
+            markUnavailableShardsAsStale(replicaRequest, replicationGroup);
+            performOnReplicas(replicaRequest, globalCheckpoint, replicationGroup);
         }
 
         successfulShards.incrementAndGet();  // mark primary as successful
         decPendingAndFinishIfNeeded();
     }
 
-    private void markUnavailableShardsAsStale(ReplicaRequest replicaRequest, Set<String> inSyncAllocationIds,
-                                              IndexShardRoutingTable indexShardRoutingTable) {
+    private void markUnavailableShardsAsStale(ReplicaRequest replicaRequest, ReplicationGroup replicationGroup) {
         // if inSyncAllocationIds contains allocation ids of shards that don't exist in RoutingTable, mark copies as stale
-        for (String allocationId : Sets.difference(inSyncAllocationIds, indexShardRoutingTable.getAllAllocationIds())) {
-            // mark copy as stale
+        for (String allocationId : replicationGroup.getUnavailableInSyncShards()) {
             pendingActions.incrementAndGet();
             replicasProxy.markShardCopyAsStaleIfNeeded(replicaRequest.shardId(), allocationId,
                 ReplicationOperation.this::decPendingAndFinishIfNeeded,
@@ -140,22 +138,16 @@ public class ReplicationOperation<
     }
 
     private void performOnReplicas(final ReplicaRequest replicaRequest, final long globalCheckpoint,
-                                   final IndexShardRoutingTable indexShardRoutingTable) {
-        final String localNodeId = primary.routingEntry().currentNodeId();
-        // If the index gets deleted after primary operation, we skip replication
-        for (final ShardRouting shard : indexShardRoutingTable) {
-            if (shard.unassigned()) {
-                assert shard.primary() == false : "primary shard should not be unassigned in a replication group: " + shard;
-                totalShards.incrementAndGet();
-                continue;
-            }
+                                   final ReplicationGroup replicationGroup) {
+        // for total stats, add number of unassigned shards and
+        // number of initializing shards that are not ready yet to receive operations (recovery has not opened engine yet on the target)
+        totalShards.addAndGet(replicationGroup.getSkippedShards().size());
 
-            if (shard.currentNodeId().equals(localNodeId) == false) {
-                performOnReplica(shard, replicaRequest, globalCheckpoint);
-            }
+        final ShardRouting primaryRouting = primary.routingEntry();
 
-            if (shard.relocating() && shard.relocatingNodeId().equals(localNodeId) == false) {
-                performOnReplica(shard.getTargetRelocatingShard(), replicaRequest, globalCheckpoint);
+        for (final ShardRouting shard : replicationGroup.getReplicationTargets()) {
+            if (shard.isSameAllocation(primaryRouting) == false) {
+                performOnReplica(shard, replicaRequest, globalCheckpoint);
             }
         }
     }

+ 50 - 15
server/src/main/java/org/elasticsearch/index/seqno/GlobalCheckpointTracker.java → server/src/main/java/org/elasticsearch/index/seqno/ReplicationTracker.java

@@ -21,6 +21,7 @@ package org.elasticsearch.index.seqno;
 
 import com.carrotsearch.hppc.ObjectLongHashMap;
 import com.carrotsearch.hppc.ObjectLongMap;
+import org.elasticsearch.Version;
 import org.elasticsearch.cluster.routing.AllocationId;
 import org.elasticsearch.cluster.routing.IndexShardRoutingTable;
 import org.elasticsearch.cluster.routing.ShardRouting;
@@ -48,15 +49,17 @@ import java.util.stream.Collectors;
 import java.util.stream.LongStream;
 
 /**
- * This class is responsible of tracking the global checkpoint. The global checkpoint is the highest sequence number for which all lower (or
- * equal) sequence number have been processed on all shards that are currently active. Since shards count as "active" when the master starts
+ * This class is responsible for tracking the replication group with its progress and safety markers (local and global checkpoints).
+ *
+ * The global checkpoint is the highest sequence number for which all lower (or equal) sequence number have been processed
+ * on all shards that are currently active. Since shards count as "active" when the master starts
  * them, and before this primary shard has been notified of this fact, we also include shards that have completed recovery. These shards
  * have received all old operations via the recovery mechanism and are kept up to date by the various replications actions. The set of
  * shards that are taken into account for the global checkpoint calculation are called the "in-sync shards".
  * <p>
  * The global checkpoint is maintained by the primary shard and is replicated to all the replicas (via {@link GlobalCheckpointSyncAction}).
  */
-public class GlobalCheckpointTracker extends AbstractIndexShardComponent implements LongSupplier {
+public class ReplicationTracker extends AbstractIndexShardComponent implements LongSupplier {
 
     /**
      * The allocation ID for the shard to which this tracker is a component of.
@@ -146,16 +149,32 @@ public class GlobalCheckpointTracker extends AbstractIndexShardComponent impleme
          */
         boolean inSync;
 
-        public CheckpointState(long localCheckpoint, long globalCheckpoint, boolean inSync) {
+        /**
+         * whether this shard is tracked in the replication group, i.e., should receive document updates from the primary.
+         */
+        boolean tracked;
+
+        public CheckpointState(long localCheckpoint, long globalCheckpoint, boolean inSync, boolean tracked) {
             this.localCheckpoint = localCheckpoint;
             this.globalCheckpoint = globalCheckpoint;
             this.inSync = inSync;
+            this.tracked = tracked;
         }
 
         public CheckpointState(StreamInput in) throws IOException {
             this.localCheckpoint = in.readZLong();
             this.globalCheckpoint = in.readZLong();
             this.inSync = in.readBoolean();
+            if (in.getVersion().onOrAfter(Version.V_6_3_0)) {
+                this.tracked = in.readBoolean();
+            } else {
+                // Every in-sync shard copy is also tracked (see invariant). This was the case even in earlier ES versions.
+                // Non in-sync shard copies might be tracked or not. As this information here is only serialized during relocation hand-off,
+                // after which replica recoveries cannot complete anymore (i.e. they cannot move from in-sync == false to in-sync == true),
+                // we can treat non in-sync replica shard copies as untracked. They will go through a fresh recovery against the new
+                // primary and will become tracked again under this primary before they are marked as in-sync.
+                this.tracked = inSync;
+            }
         }
 
         @Override
@@ -163,13 +182,16 @@ public class GlobalCheckpointTracker extends AbstractIndexShardComponent impleme
             out.writeZLong(localCheckpoint);
             out.writeZLong(globalCheckpoint);
             out.writeBoolean(inSync);
+            if (out.getVersion().onOrAfter(Version.V_6_3_0)) {
+                out.writeBoolean(tracked);
+            }
         }
 
         /**
          * Returns a full copy of this object
          */
         public CheckpointState copy() {
-            return new CheckpointState(localCheckpoint, globalCheckpoint, inSync);
+            return new CheckpointState(localCheckpoint, globalCheckpoint, inSync, tracked);
         }
 
         public long getLocalCheckpoint() {
@@ -186,6 +208,7 @@ public class GlobalCheckpointTracker extends AbstractIndexShardComponent impleme
                 "localCheckpoint=" + localCheckpoint +
                 ", globalCheckpoint=" + globalCheckpoint +
                 ", inSync=" + inSync +
+                ", tracked=" + tracked +
                 '}';
         }
 
@@ -198,7 +221,8 @@ public class GlobalCheckpointTracker extends AbstractIndexShardComponent impleme
 
             if (localCheckpoint != that.localCheckpoint) return false;
             if (globalCheckpoint != that.globalCheckpoint) return false;
-            return inSync == that.inSync;
+            if (inSync != that.inSync) return false;
+            return tracked == that.tracked;
         }
 
         @Override
@@ -206,6 +230,7 @@ public class GlobalCheckpointTracker extends AbstractIndexShardComponent impleme
             int result = Long.hashCode(localCheckpoint);
             result = 31 * result + Long.hashCode(globalCheckpoint);
             result = 31 * result + Boolean.hashCode(inSync);
+            result = 31 * result + Boolean.hashCode(tracked);
             return result;
         }
     }
@@ -301,6 +326,9 @@ public class GlobalCheckpointTracker extends AbstractIndexShardComponent impleme
             // blocking global checkpoint advancement only happens for shards that are not in-sync
             assert !pendingInSync.contains(entry.getKey()) || !entry.getValue().inSync :
                 "shard copy " + entry.getKey() + " blocks global checkpoint advancement but is in-sync";
+            // in-sync shard copies are tracked
+            assert !entry.getValue().inSync || entry.getValue().tracked :
+                "shard copy " + entry.getKey() + " is in-sync but not tracked";
         }
 
         return true;
@@ -330,7 +358,7 @@ public class GlobalCheckpointTracker extends AbstractIndexShardComponent impleme
      * @param indexSettings    the index settings
      * @param globalCheckpoint the last known global checkpoint for this shard, or {@link SequenceNumbers#UNASSIGNED_SEQ_NO}
      */
-    public GlobalCheckpointTracker(
+    public ReplicationTracker(
             final ShardId shardId,
             final String allocationId,
             final IndexSettings indexSettings,
@@ -342,7 +370,7 @@ public class GlobalCheckpointTracker extends AbstractIndexShardComponent impleme
         this.handoffInProgress = false;
         this.appliedClusterStateVersion = -1L;
         this.checkpoints = new HashMap<>(1 + indexSettings.getNumberOfReplicas());
-        checkpoints.put(allocationId, new CheckpointState(SequenceNumbers.UNASSIGNED_SEQ_NO, globalCheckpoint, false));
+        checkpoints.put(allocationId, new CheckpointState(SequenceNumbers.UNASSIGNED_SEQ_NO, globalCheckpoint, false, false));
         this.pendingInSync = new HashSet<>();
         this.routingTable = null;
         this.replicationGroup = null;
@@ -361,7 +389,8 @@ public class GlobalCheckpointTracker extends AbstractIndexShardComponent impleme
 
     private ReplicationGroup calculateReplicationGroup() {
         return new ReplicationGroup(routingTable,
-            checkpoints.entrySet().stream().filter(e -> e.getValue().inSync).map(Map.Entry::getKey).collect(Collectors.toSet()));
+            checkpoints.entrySet().stream().filter(e -> e.getValue().inSync).map(Map.Entry::getKey).collect(Collectors.toSet()),
+            checkpoints.entrySet().stream().filter(e -> e.getValue().tracked).map(Map.Entry::getKey).collect(Collectors.toSet()));
     }
 
     /**
@@ -481,7 +510,7 @@ public class GlobalCheckpointTracker extends AbstractIndexShardComponent impleme
                         final long localCheckpoint = pre60AllocationIds.contains(initializingId) ?
                             SequenceNumbers.PRE_60_NODE_CHECKPOINT : SequenceNumbers.UNASSIGNED_SEQ_NO;
                         final long globalCheckpoint = localCheckpoint;
-                        checkpoints.put(initializingId, new CheckpointState(localCheckpoint, globalCheckpoint, inSync));
+                        checkpoints.put(initializingId, new CheckpointState(localCheckpoint, globalCheckpoint, inSync, inSync));
                     }
                 }
             } else {
@@ -490,18 +519,20 @@ public class GlobalCheckpointTracker extends AbstractIndexShardComponent impleme
                         final long localCheckpoint = pre60AllocationIds.contains(initializingId) ?
                             SequenceNumbers.PRE_60_NODE_CHECKPOINT : SequenceNumbers.UNASSIGNED_SEQ_NO;
                         final long globalCheckpoint = localCheckpoint;
-                        checkpoints.put(initializingId, new CheckpointState(localCheckpoint, globalCheckpoint, false));
+                        checkpoints.put(initializingId, new CheckpointState(localCheckpoint, globalCheckpoint, false, false));
                     }
                 }
                 for (String inSyncId : inSyncAllocationIds) {
                     if (shardAllocationId.equals(inSyncId)) {
                         // current shard is initially marked as not in-sync because we don't know better at that point
-                        checkpoints.get(shardAllocationId).inSync = true;
+                        CheckpointState checkpointState = checkpoints.get(shardAllocationId);
+                        checkpointState.inSync = true;
+                        checkpointState.tracked = true;
                     } else {
                         final long localCheckpoint = pre60AllocationIds.contains(inSyncId) ?
                             SequenceNumbers.PRE_60_NODE_CHECKPOINT : SequenceNumbers.UNASSIGNED_SEQ_NO;
                         final long globalCheckpoint = localCheckpoint;
-                        checkpoints.put(inSyncId, new CheckpointState(localCheckpoint, globalCheckpoint, true));
+                        checkpoints.put(inSyncId, new CheckpointState(localCheckpoint, globalCheckpoint, true, true));
                     }
                 }
             }
@@ -516,19 +547,22 @@ public class GlobalCheckpointTracker extends AbstractIndexShardComponent impleme
     }
 
     /**
-     * Called when the recovery process for a shard is ready to open the engine on the target shard. Ensures that the right data structures
-     * have been set up locally to track local checkpoint information for the shard.
+     * Called when the recovery process for a shard has opened the engine on the target shard. Ensures that the right data structures
+     * have been set up locally to track local checkpoint information for the shard and that the shard is added to the replication group.
      *
      * @param allocationId  the allocation ID of the shard for which recovery was initiated
      */
     public synchronized void initiateTracking(final String allocationId) {
         assert invariant();
         assert primaryMode;
+        assert handoffInProgress == false;
         CheckpointState cps = checkpoints.get(allocationId);
         if (cps == null) {
             // can happen if replica was removed from cluster but recovery process is unaware of it yet
             throw new IllegalStateException("no local checkpoint tracking information available");
         }
+        cps.tracked = true;
+        replicationGroup = calculateReplicationGroup();
         assert invariant();
     }
 
@@ -551,6 +585,7 @@ public class GlobalCheckpointTracker extends AbstractIndexShardComponent impleme
         assert localCheckpoint >= SequenceNumbers.NO_OPS_PERFORMED :
             "expected known local checkpoint for " + allocationId + " but was " + localCheckpoint;
         assert pendingInSync.contains(allocationId) == false : "shard copy " + allocationId + " is already marked as pending in-sync";
+        assert cps.tracked : "shard copy " + allocationId + " cannot be marked as in-sync as it's not tracked";
         updateLocalCheckpoint(allocationId, cps, localCheckpoint);
         // if it was already in-sync (because of a previously failed recovery attempt), global checkpoint must have been
         // stuck from advancing

+ 34 - 34
server/src/main/java/org/elasticsearch/index/shard/IndexShard.java

@@ -106,7 +106,7 @@ import org.elasticsearch.index.recovery.RecoveryStats;
 import org.elasticsearch.index.refresh.RefreshStats;
 import org.elasticsearch.index.search.stats.SearchStats;
 import org.elasticsearch.index.search.stats.ShardSearchStats;
-import org.elasticsearch.index.seqno.GlobalCheckpointTracker;
+import org.elasticsearch.index.seqno.ReplicationTracker;
 import org.elasticsearch.index.seqno.SeqNoStats;
 import org.elasticsearch.index.seqno.SequenceNumbers;
 import org.elasticsearch.index.shard.PrimaryReplicaSyncer.ResyncTask;
@@ -190,7 +190,7 @@ public class IndexShard extends AbstractIndexShardComponent implements IndicesCl
 
     private final SearchOperationListener searchOperationListener;
 
-    private final GlobalCheckpointTracker globalCheckpointTracker;
+    private final ReplicationTracker replicationTracker;
 
     protected volatile ShardRouting shardRouting;
     protected volatile IndexShardState state;
@@ -298,7 +298,7 @@ public class IndexShard extends AbstractIndexShardComponent implements IndicesCl
 
         this.checkIndexOnStartup = indexSettings.getValue(IndexSettings.INDEX_CHECK_ON_STARTUP);
         this.translogConfig = new TranslogConfig(shardId, shardPath().resolveTranslog(), indexSettings, bigArrays);
-        this.globalCheckpointTracker = new GlobalCheckpointTracker(shardId, shardRouting.allocationId().getId(), indexSettings,
+        this.replicationTracker = new ReplicationTracker(shardId, shardRouting.allocationId().getId(), indexSettings,
             SequenceNumbers.UNASSIGNED_SEQ_NO);
         // the query cache is a node-level thing, however we want the most popular filters
         // to be computed on a per-shard basis
@@ -402,7 +402,7 @@ public class IndexShard extends AbstractIndexShardComponent implements IndicesCl
             }
 
             if (newRouting.primary()) {
-                globalCheckpointTracker.updateFromMaster(applyingClusterStateVersion, inSyncAllocationIds, routingTable, pre60AllocationIds);
+                replicationTracker.updateFromMaster(applyingClusterStateVersion, inSyncAllocationIds, routingTable, pre60AllocationIds);
             }
 
             if (state == IndexShardState.POST_RECOVERY && newRouting.active()) {
@@ -415,7 +415,7 @@ public class IndexShard extends AbstractIndexShardComponent implements IndicesCl
                 }
 
                 if (newRouting.primary() && currentRouting.isRelocationTarget() == false) {
-                    globalCheckpointTracker.activatePrimaryMode(getEngine().getLocalCheckpointTracker().getCheckpoint());
+                    replicationTracker.activatePrimaryMode(getEngine().getLocalCheckpointTracker().getCheckpoint());
                 }
 
                 changeState(IndexShardState.STARTED, "global state is [" + newRouting.state() + "]");
@@ -490,7 +490,7 @@ public class IndexShard extends AbstractIndexShardComponent implements IndicesCl
                                  */
                                 engine.rollTranslogGeneration();
                                 engine.fillSeqNoGaps(newPrimaryTerm);
-                                globalCheckpointTracker.updateLocalCheckpoint(currentRouting.allocationId().getId(),
+                                replicationTracker.updateLocalCheckpoint(currentRouting.allocationId().getId(),
                                     getEngine().getLocalCheckpointTracker().getCheckpoint());
                                 primaryReplicaSyncer.accept(this, new ActionListener<ResyncTask>() {
                                     @Override
@@ -517,7 +517,7 @@ public class IndexShard extends AbstractIndexShardComponent implements IndicesCl
                             }
                         },
                         e -> failShard("exception during primary term transition", e));
-                    globalCheckpointTracker.activatePrimaryMode(getEngine().getLocalCheckpointTracker().getCheckpoint());
+                    replicationTracker.activatePrimaryMode(getEngine().getLocalCheckpointTracker().getCheckpoint());
                     primaryTerm = newPrimaryTerm;
                 }
             }
@@ -571,7 +571,7 @@ public class IndexShard extends AbstractIndexShardComponent implements IndicesCl
      * @throws InterruptedException            if blocking operations is interrupted
      */
     public void relocated(
-            final String reason, final Consumer<GlobalCheckpointTracker.PrimaryContext> consumer) throws IllegalIndexShardStateException, InterruptedException {
+            final String reason, final Consumer<ReplicationTracker.PrimaryContext> consumer) throws IllegalIndexShardStateException, InterruptedException {
         assert shardRouting.primary() : "only primaries can be marked as relocated: " + shardRouting;
         try {
             indexShardOperationPermits.blockOperations(30, TimeUnit.MINUTES, () -> {
@@ -583,17 +583,17 @@ public class IndexShard extends AbstractIndexShardComponent implements IndicesCl
                  * network operation. Doing this under the mutex can implicitly block the cluster state update thread on network operations.
                  */
                 verifyRelocatingState();
-                final GlobalCheckpointTracker.PrimaryContext primaryContext = globalCheckpointTracker.startRelocationHandoff();
+                final ReplicationTracker.PrimaryContext primaryContext = replicationTracker.startRelocationHandoff();
                 try {
                     consumer.accept(primaryContext);
                     synchronized (mutex) {
                         verifyRelocatingState();
                         changeState(IndexShardState.RELOCATED, reason);
                     }
-                    globalCheckpointTracker.completeRelocationHandoff();
+                    replicationTracker.completeRelocationHandoff();
                 } catch (final Exception e) {
                     try {
-                        globalCheckpointTracker.abortRelocationHandoff();
+                        replicationTracker.abortRelocationHandoff();
                     } catch (final Exception inner) {
                         e.addSuppressed(inner);
                     }
@@ -910,7 +910,7 @@ public class IndexShard extends AbstractIndexShardComponent implements IndicesCl
     @Nullable
     public SeqNoStats seqNoStats() {
         Engine engine = getEngineOrNull();
-        return engine == null ? null : engine.getLocalCheckpointTracker().getStats(globalCheckpointTracker.getGlobalCheckpoint());
+        return engine == null ? null : engine.getLocalCheckpointTracker().getStats(replicationTracker.getGlobalCheckpoint());
     }
 
     public IndexingStats indexingStats(String... types) {
@@ -1285,7 +1285,7 @@ public class IndexShard extends AbstractIndexShardComponent implements IndicesCl
         final RecoveryState.Translog translogStats = recoveryState().getTranslog();
         translogStats.totalOperations(0);
         translogStats.totalOperationsOnStart(0);
-        globalCheckpointTracker.updateGlobalCheckpointOnReplica(SequenceNumbers.NO_OPS_PERFORMED, "index created");
+        replicationTracker.updateGlobalCheckpointOnReplica(SequenceNumbers.NO_OPS_PERFORMED, "index created");
         innerOpenEngineAndTranslog(EngineConfig.OpenMode.CREATE_INDEX_AND_TRANSLOG, false);
     }
 
@@ -1304,7 +1304,7 @@ public class IndexShard extends AbstractIndexShardComponent implements IndicesCl
                 assert existingCommits.size() == 1 : "Open index create translog should have one commit, commits[" + existingCommits + "]";
             }
         }
-        globalCheckpointTracker.updateGlobalCheckpointOnReplica(globalCheckpoint, "opening index with a new translog");
+        replicationTracker.updateGlobalCheckpointOnReplica(globalCheckpoint, "opening index with a new translog");
         innerOpenEngineAndTranslog(EngineConfig.OpenMode.OPEN_INDEX_CREATE_TRANSLOG, forceNewHistoryUUID);
     }
 
@@ -1355,7 +1355,7 @@ public class IndexShard extends AbstractIndexShardComponent implements IndicesCl
             // we have to set it before we open an engine and recover from the translog because
             // acquiring a snapshot from the translog causes a sync which causes the global checkpoint to be pulled in,
             // and an engine can be forced to close in ctor which also causes the global checkpoint to be pulled in.
-            globalCheckpointTracker.updateGlobalCheckpointOnReplica(Translog.readGlobalCheckpoint(translogConfig.getTranslogPath()),
+            replicationTracker.updateGlobalCheckpointOnReplica(Translog.readGlobalCheckpoint(translogConfig.getTranslogPath()),
                 "read from translog checkpoint");
         }
         createNewEngine(config);
@@ -1721,7 +1721,7 @@ public class IndexShard extends AbstractIndexShardComponent implements IndicesCl
 
     /**
      * Notifies the service to update the local checkpoint for the shard with the provided allocation ID. See
-     * {@link org.elasticsearch.index.seqno.GlobalCheckpointTracker#updateLocalCheckpoint(String, long)} for
+     * {@link ReplicationTracker#updateLocalCheckpoint(String, long)} for
      * details.
      *
      * @param allocationId the allocation ID of the shard to update the local checkpoint for
@@ -1730,7 +1730,7 @@ public class IndexShard extends AbstractIndexShardComponent implements IndicesCl
     public void updateLocalCheckpointForShard(final String allocationId, final long checkpoint) {
         verifyPrimary();
         verifyNotClosed();
-        globalCheckpointTracker.updateLocalCheckpoint(allocationId, checkpoint);
+        replicationTracker.updateLocalCheckpoint(allocationId, checkpoint);
     }
 
     /**
@@ -1742,7 +1742,7 @@ public class IndexShard extends AbstractIndexShardComponent implements IndicesCl
     public void updateGlobalCheckpointForShard(final String allocationId, final long globalCheckpoint) {
         verifyPrimary();
         verifyNotClosed();
-        globalCheckpointTracker.updateGlobalCheckpointForShard(allocationId, globalCheckpoint);
+        replicationTracker.updateGlobalCheckpointForShard(allocationId, globalCheckpoint);
     }
 
     /**
@@ -1756,19 +1756,19 @@ public class IndexShard extends AbstractIndexShardComponent implements IndicesCl
     }
 
     /**
-     * Called when the recovery process for a shard is ready to open the engine on the target shard.
-     * See {@link GlobalCheckpointTracker#initiateTracking(String)} for details.
+     * Called when the recovery process for a shard has opened the engine on the target shard. Ensures that the right data structures
+     * have been set up locally to track local checkpoint information for the shard and that the shard is added to the replication group.
      *
      * @param allocationId  the allocation ID of the shard for which recovery was initiated
      */
     public void initiateTracking(final String allocationId) {
         verifyPrimary();
-        globalCheckpointTracker.initiateTracking(allocationId);
+        replicationTracker.initiateTracking(allocationId);
     }
 
     /**
      * Marks the shard with the provided allocation ID as in-sync with the primary shard. See
-     * {@link org.elasticsearch.index.seqno.GlobalCheckpointTracker#markAllocationIdAsInSync(String, long)}
+     * {@link ReplicationTracker#markAllocationIdAsInSync(String, long)}
      * for additional details.
      *
      * @param allocationId    the allocation ID of the shard to mark as in-sync
@@ -1776,7 +1776,7 @@ public class IndexShard extends AbstractIndexShardComponent implements IndicesCl
      */
     public void markAllocationIdAsInSync(final String allocationId, final long localCheckpoint) throws InterruptedException {
         verifyPrimary();
-        globalCheckpointTracker.markAllocationIdAsInSync(allocationId, localCheckpoint);
+        replicationTracker.markAllocationIdAsInSync(allocationId, localCheckpoint);
     }
 
     /**
@@ -1794,7 +1794,7 @@ public class IndexShard extends AbstractIndexShardComponent implements IndicesCl
      * @return the global checkpoint
      */
     public long getGlobalCheckpoint() {
-        return globalCheckpointTracker.getGlobalCheckpoint();
+        return replicationTracker.getGlobalCheckpoint();
     }
 
     /**
@@ -1805,7 +1805,7 @@ public class IndexShard extends AbstractIndexShardComponent implements IndicesCl
     public ObjectLongMap<String> getInSyncGlobalCheckpoints() {
         verifyPrimary();
         verifyNotClosed();
-        return globalCheckpointTracker.getInSyncGlobalCheckpoints();
+        return replicationTracker.getInSyncGlobalCheckpoints();
     }
 
     /**
@@ -1819,7 +1819,7 @@ public class IndexShard extends AbstractIndexShardComponent implements IndicesCl
             return;
         }
         // only sync if there are not operations in flight
-        final SeqNoStats stats = getEngine().getLocalCheckpointTracker().getStats(globalCheckpointTracker.getGlobalCheckpoint());
+        final SeqNoStats stats = getEngine().getLocalCheckpointTracker().getStats(replicationTracker.getGlobalCheckpoint());
         if (stats.getMaxSeqNo() == stats.getGlobalCheckpoint()) {
             final ObjectLongMap<String> globalCheckpoints = getInSyncGlobalCheckpoints();
             final String allocationId = routingEntry().allocationId().getId();
@@ -1845,7 +1845,7 @@ public class IndexShard extends AbstractIndexShardComponent implements IndicesCl
     public ReplicationGroup getReplicationGroup() {
         verifyPrimary();
         verifyNotClosed();
-        return globalCheckpointTracker.getReplicationGroup();
+        return replicationTracker.getReplicationGroup();
     }
 
     /**
@@ -1873,7 +1873,7 @@ public class IndexShard extends AbstractIndexShardComponent implements IndicesCl
                     "that is higher than its local checkpoint [" + localCheckpoint + "]";
             return;
         }
-        globalCheckpointTracker.updateGlobalCheckpointOnReplica(globalCheckpoint, reason);
+        replicationTracker.updateGlobalCheckpointOnReplica(globalCheckpoint, reason);
     }
 
     /**
@@ -1881,13 +1881,13 @@ public class IndexShard extends AbstractIndexShardComponent implements IndicesCl
      *
      * @param primaryContext the sequence number context
      */
-    public void activateWithPrimaryContext(final GlobalCheckpointTracker.PrimaryContext primaryContext) {
+    public void activateWithPrimaryContext(final ReplicationTracker.PrimaryContext primaryContext) {
         verifyPrimary();
         assert shardRouting.isRelocationTarget() : "only relocation target can update allocation IDs from primary context: " + shardRouting;
         assert primaryContext.getCheckpointStates().containsKey(routingEntry().allocationId().getId()) &&
             getEngine().getLocalCheckpointTracker().getCheckpoint() ==
                 primaryContext.getCheckpointStates().get(routingEntry().allocationId().getId()).getLocalCheckpoint();
-        globalCheckpointTracker.activateWithPrimaryContext(primaryContext);
+        replicationTracker.activateWithPrimaryContext(primaryContext);
     }
 
     /**
@@ -1897,7 +1897,7 @@ public class IndexShard extends AbstractIndexShardComponent implements IndicesCl
      */
     public boolean pendingInSync() {
         verifyPrimary();
-        return globalCheckpointTracker.pendingInSync();
+        return replicationTracker.pendingInSync();
     }
 
     /**
@@ -2191,7 +2191,7 @@ public class IndexShard extends AbstractIndexShardComponent implements IndicesCl
             IndexingMemoryController.SHARD_INACTIVE_TIME_SETTING.get(indexSettings.getSettings()),
             Collections.singletonList(refreshListeners),
             Collections.singletonList(new RefreshMetricUpdater(refreshMetric)),
-            indexSort, this::runTranslogRecovery, circuitBreakerService, globalCheckpointTracker);
+            indexSort, this::runTranslogRecovery, circuitBreakerService, replicationTracker);
     }
 
     /**
@@ -2458,8 +2458,8 @@ public class IndexShard extends AbstractIndexShardComponent implements IndicesCl
     }
 
     // for tests
-    GlobalCheckpointTracker getGlobalCheckpointTracker() {
-        return globalCheckpointTracker;
+    ReplicationTracker getReplicationTracker() {
+        return replicationTracker;
     }
 
     /**

+ 66 - 2
server/src/main/java/org/elasticsearch/index/shard/ReplicationGroup.java

@@ -20,7 +20,11 @@
 package org.elasticsearch.index.shard;
 
 import org.elasticsearch.cluster.routing.IndexShardRoutingTable;
+import org.elasticsearch.cluster.routing.ShardRouting;
+import org.elasticsearch.common.util.set.Sets;
 
+import java.util.ArrayList;
+import java.util.List;
 import java.util.Set;
 
 /**
@@ -29,10 +33,44 @@ import java.util.Set;
 public class ReplicationGroup {
     private final IndexShardRoutingTable routingTable;
     private final Set<String> inSyncAllocationIds;
+    private final Set<String> trackedAllocationIds;
 
-    public ReplicationGroup(IndexShardRoutingTable routingTable, Set<String> inSyncAllocationIds) {
+    private final Set<String> unavailableInSyncShards; // derived from the other fields
+    private final List<ShardRouting> replicationTargets; // derived from the other fields
+    private final List<ShardRouting> skippedShards; // derived from the other fields
+
+    public ReplicationGroup(IndexShardRoutingTable routingTable, Set<String> inSyncAllocationIds, Set<String> trackedAllocationIds) {
         this.routingTable = routingTable;
         this.inSyncAllocationIds = inSyncAllocationIds;
+        this.trackedAllocationIds = trackedAllocationIds;
+
+        this.unavailableInSyncShards = Sets.difference(inSyncAllocationIds, routingTable.getAllAllocationIds());
+        this.replicationTargets = new ArrayList<>();
+        this.skippedShards = new ArrayList<>();
+        for (final ShardRouting shard : routingTable) {
+            if (shard.unassigned()) {
+                assert shard.primary() == false : "primary shard should not be unassigned in a replication group: " + shard;
+                skippedShards.add(shard);
+            } else {
+                if (trackedAllocationIds.contains(shard.allocationId().getId())) {
+                    replicationTargets.add(shard);
+                } else {
+                    assert inSyncAllocationIds.contains(shard.allocationId().getId()) == false :
+                        "in-sync shard copy but not tracked: " + shard;
+                    skippedShards.add(shard);
+                }
+                if (shard.relocating()) {
+                    ShardRouting relocationTarget = shard.getTargetRelocatingShard();
+                    if (trackedAllocationIds.contains(relocationTarget.allocationId().getId())) {
+                        replicationTargets.add(relocationTarget);
+                    } else {
+                        skippedShards.add(relocationTarget);
+                        assert inSyncAllocationIds.contains(relocationTarget.allocationId().getId()) == false :
+                            "in-sync shard copy but not tracked: " + shard;
+                    }
+                }
+            }
+        }
     }
 
     public IndexShardRoutingTable getRoutingTable() {
@@ -43,6 +81,29 @@ public class ReplicationGroup {
         return inSyncAllocationIds;
     }
 
+    /**
+     * Returns the set of shard allocation ids that are in the in-sync set but have no assigned routing entry
+     */
+    public Set<String> getUnavailableInSyncShards() {
+        return unavailableInSyncShards;
+    }
+
+    /**
+     * Returns the subset of shards in the routing table that should be replicated to. Includes relocation targets.
+     */
+    public List<ShardRouting> getReplicationTargets() {
+        return replicationTargets;
+    }
+
+    /**
+     * Returns the subset of shards in the routing table that are unassigned or initializing and not ready yet to receive operations
+     * (i.e. engine not opened yet). Includes relocation targets.
+     */
+    public List<ShardRouting> getSkippedShards() {
+        return skippedShards;
+    }
+
+
     @Override
     public boolean equals(Object o) {
         if (this == o) return true;
@@ -51,13 +112,15 @@ public class ReplicationGroup {
         ReplicationGroup that = (ReplicationGroup) o;
 
         if (!routingTable.equals(that.routingTable)) return false;
-        return inSyncAllocationIds.equals(that.inSyncAllocationIds);
+        if (!inSyncAllocationIds.equals(that.inSyncAllocationIds)) return false;
+        return trackedAllocationIds.equals(that.trackedAllocationIds);
     }
 
     @Override
     public int hashCode() {
         int result = routingTable.hashCode();
         result = 31 * result + inSyncAllocationIds.hashCode();
+        result = 31 * result + trackedAllocationIds.hashCode();
         return result;
     }
 
@@ -66,6 +129,7 @@ public class ReplicationGroup {
         return "ReplicationGroup{" +
             "routingTable=" + routingTable +
             ", inSyncAllocationIds=" + inSyncAllocationIds +
+            ", trackedAllocationIds=" + trackedAllocationIds +
             '}';
     }
 

+ 2 - 2
server/src/main/java/org/elasticsearch/indices/cluster/IndicesClusterStateService.java

@@ -55,7 +55,7 @@ import org.elasticsearch.index.IndexComponent;
 import org.elasticsearch.index.IndexService;
 import org.elasticsearch.index.IndexSettings;
 import org.elasticsearch.index.seqno.GlobalCheckpointSyncAction;
-import org.elasticsearch.index.seqno.GlobalCheckpointTracker;
+import org.elasticsearch.index.seqno.ReplicationTracker;
 import org.elasticsearch.index.shard.IndexEventListener;
 import org.elasticsearch.index.shard.IndexShard;
 import org.elasticsearch.index.shard.IndexShardRelocatedException;
@@ -742,7 +742,7 @@ public class IndicesClusterStateService extends AbstractLifecycleComponent imple
          * - Updates and persists the new routing value.
          * - Updates the primary term if this shard is a primary.
          * - Updates the allocation ids that are tracked by the shard if it is a primary.
-         *   See {@link GlobalCheckpointTracker#updateFromMaster(long, Set, IndexShardRoutingTable, Set)} for details.
+         *   See {@link ReplicationTracker#updateFromMaster(long, Set, IndexShardRoutingTable, Set)} for details.
          *
          * @param shardRouting                the new routing entry
          * @param primaryTerm                 the new primary term

+ 5 - 5
server/src/main/java/org/elasticsearch/indices/recovery/RecoveryHandoffPrimaryContextRequest.java

@@ -21,7 +21,7 @@ package org.elasticsearch.indices.recovery;
 
 import org.elasticsearch.common.io.stream.StreamInput;
 import org.elasticsearch.common.io.stream.StreamOutput;
-import org.elasticsearch.index.seqno.GlobalCheckpointTracker;
+import org.elasticsearch.index.seqno.ReplicationTracker;
 import org.elasticsearch.index.shard.ShardId;
 import org.elasticsearch.transport.TransportRequest;
 
@@ -34,7 +34,7 @@ class RecoveryHandoffPrimaryContextRequest extends TransportRequest {
 
     private long recoveryId;
     private ShardId shardId;
-    private GlobalCheckpointTracker.PrimaryContext primaryContext;
+    private ReplicationTracker.PrimaryContext primaryContext;
 
     /**
      * Initialize an empty request (used to serialize into when reading from a stream).
@@ -50,7 +50,7 @@ class RecoveryHandoffPrimaryContextRequest extends TransportRequest {
      * @param primaryContext the primary context
      */
     RecoveryHandoffPrimaryContextRequest(final long recoveryId, final ShardId shardId,
-                                         final GlobalCheckpointTracker.PrimaryContext primaryContext) {
+                                         final ReplicationTracker.PrimaryContext primaryContext) {
         this.recoveryId = recoveryId;
         this.shardId = shardId;
         this.primaryContext = primaryContext;
@@ -64,7 +64,7 @@ class RecoveryHandoffPrimaryContextRequest extends TransportRequest {
         return shardId;
     }
 
-    GlobalCheckpointTracker.PrimaryContext primaryContext() {
+    ReplicationTracker.PrimaryContext primaryContext() {
         return primaryContext;
     }
 
@@ -73,7 +73,7 @@ class RecoveryHandoffPrimaryContextRequest extends TransportRequest {
         super.readFrom(in);
         recoveryId = in.readLong();
         shardId = ShardId.readShardId(in);
-        primaryContext = new GlobalCheckpointTracker.PrimaryContext(in);
+        primaryContext = new ReplicationTracker.PrimaryContext(in);
     }
 
     @Override

+ 8 - 2
server/src/main/java/org/elasticsearch/indices/recovery/RecoverySourceHandler.java

@@ -185,8 +185,6 @@ public class RecoverySourceHandler {
             assert requiredSeqNoRangeStart >= startingSeqNo : "requiredSeqNoRangeStart [" + requiredSeqNoRangeStart + "] is lower than ["
                 + startingSeqNo + "]";
 
-            runUnderPrimaryPermit(() -> shard.initiateTracking(request.targetAllocationId()));
-
             try {
                 // For a sequence based recovery, the target can keep its local translog
                 prepareTargetForTranslog(isSequenceNumberBasedRecovery == false, translog.estimateTotalOperationsFromMinSeq(startingSeqNo));
@@ -194,6 +192,14 @@ public class RecoverySourceHandler {
                 throw new RecoveryEngineException(shard.shardId(), 1, "prepare target for translog failed", e);
             }
 
+            /*
+             * add shard to replication group (shard will receive replication requests from this point on) now that engine is open.
+             * This means that any document indexed into the primary after this will be replicated to this replica as well
+             * make sure to do this before sampling the max sequence number in the next step, to ensure that we send
+             * all documents up to maxSeqNo in phase2.
+             */
+            runUnderPrimaryPermit(() -> shard.initiateTracking(request.targetAllocationId()));
+
             final long endingSeqNo = shard.seqNoStats().getMaxSeqNo();
             /*
              * We need to wait for all operations up to the current max to complete, otherwise we can not guarantee that all

+ 2 - 2
server/src/main/java/org/elasticsearch/indices/recovery/RecoveryTarget.java

@@ -41,7 +41,7 @@ import org.elasticsearch.common.util.concurrent.AbstractRefCounted;
 import org.elasticsearch.common.util.concurrent.ConcurrentCollections;
 import org.elasticsearch.index.engine.Engine;
 import org.elasticsearch.index.mapper.MapperException;
-import org.elasticsearch.index.seqno.GlobalCheckpointTracker;
+import org.elasticsearch.index.seqno.ReplicationTracker;
 import org.elasticsearch.index.seqno.SequenceNumbers;
 import org.elasticsearch.index.shard.IndexShard;
 import org.elasticsearch.index.shard.IndexShardNotRecoveringException;
@@ -387,7 +387,7 @@ public class RecoveryTarget extends AbstractRefCounted implements RecoveryTarget
     }
 
     @Override
-    public void handoffPrimaryContext(final GlobalCheckpointTracker.PrimaryContext primaryContext) {
+    public void handoffPrimaryContext(final ReplicationTracker.PrimaryContext primaryContext) {
         indexShard.activateWithPrimaryContext(primaryContext);
     }
 

+ 2 - 2
server/src/main/java/org/elasticsearch/indices/recovery/RecoveryTargetHandler.java

@@ -19,7 +19,7 @@
 package org.elasticsearch.indices.recovery;
 
 import org.elasticsearch.common.bytes.BytesReference;
-import org.elasticsearch.index.seqno.GlobalCheckpointTracker;
+import org.elasticsearch.index.seqno.ReplicationTracker;
 import org.elasticsearch.index.store.Store;
 import org.elasticsearch.index.store.StoreFileMetaData;
 import org.elasticsearch.index.translog.Translog;
@@ -55,7 +55,7 @@ public interface RecoveryTargetHandler {
      *
      * @param primaryContext the primary context from the relocation source
      */
-    void handoffPrimaryContext(GlobalCheckpointTracker.PrimaryContext primaryContext);
+    void handoffPrimaryContext(ReplicationTracker.PrimaryContext primaryContext);
 
     /**
      * Index a set of translog operations on the target

+ 2 - 2
server/src/main/java/org/elasticsearch/indices/recovery/RemoteRecoveryTargetHandler.java

@@ -23,7 +23,7 @@ import org.apache.lucene.store.RateLimiter;
 import org.elasticsearch.ElasticsearchException;
 import org.elasticsearch.cluster.node.DiscoveryNode;
 import org.elasticsearch.common.bytes.BytesReference;
-import org.elasticsearch.index.seqno.GlobalCheckpointTracker;
+import org.elasticsearch.index.seqno.ReplicationTracker;
 import org.elasticsearch.index.shard.ShardId;
 import org.elasticsearch.index.store.Store;
 import org.elasticsearch.index.store.StoreFileMetaData;
@@ -100,7 +100,7 @@ public class RemoteRecoveryTargetHandler implements RecoveryTargetHandler {
     }
 
     @Override
-    public void handoffPrimaryContext(final GlobalCheckpointTracker.PrimaryContext primaryContext) {
+    public void handoffPrimaryContext(final ReplicationTracker.PrimaryContext primaryContext) {
         transportService.submitRequest(
                 targetNode,
                 PeerRecoveryTargetService.Actions.HANDOFF_PRIMARY_CONTEXT,

+ 110 - 44
server/src/test/java/org/elasticsearch/action/support/replication/ReplicationOperationTests.java

@@ -54,6 +54,7 @@ import java.util.concurrent.atomic.AtomicBoolean;
 import java.util.concurrent.atomic.AtomicReference;
 import java.util.function.Consumer;
 import java.util.function.Supplier;
+import java.util.stream.Collectors;
 
 import static org.elasticsearch.action.support.replication.ClusterStateCreationUtils.state;
 import static org.elasticsearch.action.support.replication.ClusterStateCreationUtils.stateWithActivePrimary;
@@ -69,23 +70,30 @@ public class ReplicationOperationTests extends ESTestCase {
         final String index = "test";
         final ShardId shardId = new ShardId(index, "_na_", 0);
 
-        ClusterState state = stateWithActivePrimary(index, true, randomInt(5));
-        IndexMetaData indexMetaData = state.getMetaData().index(index);
+        ClusterState initialState = stateWithActivePrimary(index, true, randomInt(5));
+        IndexMetaData indexMetaData = initialState.getMetaData().index(index);
         final long primaryTerm = indexMetaData.primaryTerm(0);
-        final IndexShardRoutingTable indexShardRoutingTable = state.getRoutingTable().shardRoutingTable(shardId);
+        final IndexShardRoutingTable indexShardRoutingTable = initialState.getRoutingTable().shardRoutingTable(shardId);
         ShardRouting primaryShard = indexShardRoutingTable.primaryShard();
         if (primaryShard.relocating() && randomBoolean()) {
             // simulate execution of the replication phase on the relocation target node after relocation source was marked as relocated
-            state = ClusterState.builder(state)
-                .nodes(DiscoveryNodes.builder(state.nodes()).localNodeId(primaryShard.relocatingNodeId())).build();
+            initialState = ClusterState.builder(initialState)
+                .nodes(DiscoveryNodes.builder(initialState.nodes()).localNodeId(primaryShard.relocatingNodeId())).build();
             primaryShard = primaryShard.getTargetRelocatingShard();
         }
         // add a few in-sync allocation ids that don't have corresponding routing entries
-        Set<String> staleAllocationIds = Sets.newHashSet(generateRandomStringArray(4, 10, false));
-        state = ClusterState.builder(state).metaData(MetaData.builder(state.metaData()).put(IndexMetaData.builder(indexMetaData)
-            .putInSyncAllocationIds(0, Sets.union(indexMetaData.inSyncAllocationIds(0), staleAllocationIds)))).build();
+        final Set<String> staleAllocationIds = Sets.newHashSet(generateRandomStringArray(4, 10, false));
 
-        final Set<ShardRouting> expectedReplicas = getExpectedReplicas(shardId, state);
+        final Set<String> inSyncAllocationIds = Sets.union(indexMetaData.inSyncAllocationIds(0), staleAllocationIds);
+
+        final Set<String> trackedShards = new HashSet<>();
+        final Set<String> untrackedShards = new HashSet<>();
+        addTrackingInfo(indexShardRoutingTable, primaryShard, trackedShards, untrackedShards);
+        trackedShards.addAll(staleAllocationIds);
+
+        final ReplicationGroup replicationGroup = new ReplicationGroup(indexShardRoutingTable, inSyncAllocationIds, trackedShards);
+
+        final Set<ShardRouting> expectedReplicas = getExpectedReplicas(shardId, initialState, trackedShards);
 
         final Map<ShardRouting, Exception> expectedFailures = new HashMap<>();
         final Set<ShardRouting> expectedFailedShards = new HashSet<>();
@@ -109,8 +117,8 @@ public class ReplicationOperationTests extends ESTestCase {
         Request request = new Request(shardId);
         PlainActionFuture<TestPrimary.Result> listener = new PlainActionFuture<>();
         final TestReplicaProxy replicasProxy = new TestReplicaProxy(primaryTerm, expectedFailures);
-        final ClusterState finalState = state;
-        final TestPrimary primary = new TestPrimary(primaryShard, () -> finalState);
+
+        final TestPrimary primary = new TestPrimary(primaryShard, () -> replicationGroup);
         final TestReplicationOperation op = new TestReplicationOperation(request,
             primary, listener, replicasProxy);
         op.execute();
@@ -126,42 +134,73 @@ public class ReplicationOperationTests extends ESTestCase {
         assertThat(shardInfo.getSuccessful(), equalTo(1 + expectedReplicas.size() - expectedFailures.size()));
         final List<ShardRouting> unassignedShards =
             indexShardRoutingTable.shardsWithState(ShardRoutingState.UNASSIGNED);
-        final int totalShards = 1 + expectedReplicas.size() + unassignedShards.size();
-        assertThat(shardInfo.getTotal(), equalTo(totalShards));
+        final int totalShards = 1 + expectedReplicas.size() + unassignedShards.size() + untrackedShards.size();
+        assertThat(replicationGroup.toString(), shardInfo.getTotal(), equalTo(totalShards));
 
         assertThat(primary.knownLocalCheckpoints.remove(primaryShard.allocationId().getId()), equalTo(primary.localCheckpoint));
         assertThat(primary.knownLocalCheckpoints, equalTo(replicasProxy.generatedLocalCheckpoints));
         assertThat(primary.knownGlobalCheckpoints, equalTo(replicasProxy.generatedGlobalCheckpoints));
     }
 
+    private void addTrackingInfo(IndexShardRoutingTable indexShardRoutingTable, ShardRouting primaryShard, Set<String> trackedShards,
+                                 Set<String> untrackedShards) {
+        for (ShardRouting shr : indexShardRoutingTable.shards()) {
+            if (shr.unassigned() == false) {
+                if (shr.initializing()) {
+                    if (randomBoolean()) {
+                        trackedShards.add(shr.allocationId().getId());
+                    } else {
+                        untrackedShards.add(shr.allocationId().getId());
+                    }
+                } else {
+                    trackedShards.add(shr.allocationId().getId());
+                    if (shr.relocating()) {
+                        if (primaryShard == shr.getTargetRelocatingShard() || randomBoolean()) {
+                            trackedShards.add(shr.getTargetRelocatingShard().allocationId().getId());
+                        } else {
+                            untrackedShards.add(shr.getTargetRelocatingShard().allocationId().getId());
+                        }
+                    }
+                }
+            }
+        }
+    }
+
     public void testDemotedPrimary() throws Exception {
         final String index = "test";
         final ShardId shardId = new ShardId(index, "_na_", 0);
 
-        ClusterState state = stateWithActivePrimary(index, true, 1 + randomInt(2), randomInt(2));
-        IndexMetaData indexMetaData = state.getMetaData().index(index);
+        ClusterState initialState = stateWithActivePrimary(index, true, 1 + randomInt(2), randomInt(2));
+        IndexMetaData indexMetaData = initialState.getMetaData().index(index);
         final long primaryTerm = indexMetaData.primaryTerm(0);
-        ShardRouting primaryShard = state.getRoutingTable().shardRoutingTable(shardId).primaryShard();
+        final IndexShardRoutingTable indexShardRoutingTable = initialState.getRoutingTable().shardRoutingTable(shardId);
+        ShardRouting primaryShard = indexShardRoutingTable.primaryShard();
         if (primaryShard.relocating() && randomBoolean()) {
             // simulate execution of the replication phase on the relocation target node after relocation source was marked as relocated
-            state = ClusterState.builder(state)
-                .nodes(DiscoveryNodes.builder(state.nodes()).localNodeId(primaryShard.relocatingNodeId())).build();
+            initialState = ClusterState.builder(initialState)
+                .nodes(DiscoveryNodes.builder(initialState.nodes()).localNodeId(primaryShard.relocatingNodeId())).build();
             primaryShard = primaryShard.getTargetRelocatingShard();
         }
-        // add in-sync allocation id that doesn't have a corresponding routing entry
-        state = ClusterState.builder(state).metaData(MetaData.builder(state.metaData()).put(IndexMetaData.builder(indexMetaData)
-            .putInSyncAllocationIds(0, Sets.union(indexMetaData.inSyncAllocationIds(0), Sets.newHashSet(randomAlphaOfLength(10))))))
-            .build();
+        // add an in-sync allocation id that doesn't have a corresponding routing entry
+        final Set<String> staleAllocationIds = Sets.newHashSet(randomAlphaOfLength(10));
+        final Set<String> inSyncAllocationIds = Sets.union(indexMetaData.inSyncAllocationIds(0), staleAllocationIds);
+        final Set<String> trackedShards = new HashSet<>();
+        addTrackingInfo(indexShardRoutingTable, primaryShard, trackedShards, new HashSet<>());
+        trackedShards.addAll(staleAllocationIds);
+
+        final ReplicationGroup replicationGroup = new ReplicationGroup(indexShardRoutingTable, inSyncAllocationIds, trackedShards);
 
-        final Set<ShardRouting> expectedReplicas = getExpectedReplicas(shardId, state);
+        final Set<ShardRouting> expectedReplicas = getExpectedReplicas(shardId, initialState, trackedShards);
 
         final Map<ShardRouting, Exception> expectedFailures = new HashMap<>();
+        if (expectedReplicas.isEmpty()) {
+            return;
+        }
         final ShardRouting failedReplica = randomFrom(new ArrayList<>(expectedReplicas));
         expectedFailures.put(failedReplica, new CorruptIndexException("simulated", (String) null));
 
         Request request = new Request(shardId);
         PlainActionFuture<TestPrimary.Result> listener = new PlainActionFuture<>();
-        final ClusterState finalState = state;
         final boolean testPrimaryDemotedOnStaleShardCopies = randomBoolean();
         final TestReplicaProxy replicasProxy = new TestReplicaProxy(primaryTerm, expectedFailures) {
             @Override
@@ -187,7 +226,7 @@ public class ReplicationOperationTests extends ESTestCase {
             }
         };
         AtomicBoolean primaryFailed = new AtomicBoolean();
-        final TestPrimary primary = new TestPrimary(primaryShard, () -> finalState) {
+        final TestPrimary primary = new TestPrimary(primaryShard, () -> replicationGroup) {
             @Override
             public void failShard(String message, Exception exception) {
                 assertTrue(primaryFailed.compareAndSet(false, true));
@@ -207,6 +246,12 @@ public class ReplicationOperationTests extends ESTestCase {
         final String index = "test";
         final ShardId shardId = new ShardId(index, "_na_", 0);
         final ClusterState initialState = stateWithActivePrimary(index, true, 0);
+        Set<String> inSyncAllocationIds = initialState.metaData().index(index).inSyncAllocationIds(0);
+        IndexShardRoutingTable shardRoutingTable = initialState.getRoutingTable().shardRoutingTable(shardId);
+        Set<String> trackedShards = new HashSet<>();
+        addTrackingInfo(shardRoutingTable, null, trackedShards, new HashSet<>());
+        ReplicationGroup initialReplicationGroup = new ReplicationGroup(shardRoutingTable, inSyncAllocationIds, trackedShards);
+
         final ClusterState stateWithAddedReplicas;
         if (randomBoolean()) {
             stateWithAddedReplicas = state(index, true, ShardRoutingState.STARTED,
@@ -214,16 +259,24 @@ public class ReplicationOperationTests extends ESTestCase {
         } else {
             stateWithAddedReplicas = state(index, true, ShardRoutingState.RELOCATING);
         }
-        AtomicReference<ClusterState> state = new AtomicReference<>(initialState);
-        logger.debug("--> using initial state:\n{}", state.get());
+
+        inSyncAllocationIds = stateWithAddedReplicas.metaData().index(index).inSyncAllocationIds(0);
+        shardRoutingTable = stateWithAddedReplicas.getRoutingTable().shardRoutingTable(shardId);
+        trackedShards = new HashSet<>();
+        addTrackingInfo(shardRoutingTable, null, trackedShards, new HashSet<>());
+
+        ReplicationGroup updatedReplicationGroup = new ReplicationGroup(shardRoutingTable, inSyncAllocationIds, trackedShards);
+
+        final AtomicReference<ReplicationGroup> replicationGroup = new AtomicReference<>(initialReplicationGroup);
+        logger.debug("--> using initial replicationGroup:\n{}", replicationGroup.get());
         final long primaryTerm = initialState.getMetaData().index(shardId.getIndexName()).primaryTerm(shardId.id());
-        final ShardRouting primaryShard = state.get().routingTable().shardRoutingTable(shardId).primaryShard();
-        final TestPrimary primary = new TestPrimary(primaryShard, state::get) {
+        final ShardRouting primaryShard = updatedReplicationGroup.getRoutingTable().primaryShard();
+        final TestPrimary primary = new TestPrimary(primaryShard, replicationGroup::get) {
             @Override
             public Result perform(Request request) throws Exception {
                 Result result = super.perform(request);
-                state.set(stateWithAddedReplicas);
-                logger.debug("--> state after primary operation:\n{}", state.get());
+                replicationGroup.set(updatedReplicationGroup);
+                logger.debug("--> state after primary operation:\n{}", replicationGroup.get());
                 return result;
             }
         };
@@ -235,7 +288,7 @@ public class ReplicationOperationTests extends ESTestCase {
         op.execute();
 
         assertThat("request was not processed on primary", request.processedOnPrimary.get(), equalTo(true));
-        Set<ShardRouting> expectedReplicas = getExpectedReplicas(shardId, state.get());
+        Set<ShardRouting> expectedReplicas = getExpectedReplicas(shardId, stateWithAddedReplicas, trackedShards);
         assertThat(request.processedOnReplicas, equalTo(expectedReplicas));
     }
 
@@ -265,10 +318,16 @@ public class ReplicationOperationTests extends ESTestCase {
             passesActiveShardCheck ? "succeed" : "retry", state);
         final long primaryTerm = state.metaData().index(index).primaryTerm(shardId.id());
         final IndexShardRoutingTable shardRoutingTable = state.routingTable().index(index).shard(shardId.id());
+
+        final Set<String> inSyncAllocationIds = state.metaData().index(index).inSyncAllocationIds(0);
+        Set<String> trackedShards = new HashSet<>();
+        addTrackingInfo(shardRoutingTable, null, trackedShards, new HashSet<>());
+        final ReplicationGroup initialReplicationGroup = new ReplicationGroup(shardRoutingTable, inSyncAllocationIds, trackedShards);
+
         PlainActionFuture<TestPrimary.Result> listener = new PlainActionFuture<>();
         final ShardRouting primaryShard = shardRoutingTable.primaryShard();
         final TestReplicationOperation op = new TestReplicationOperation(request,
-            new TestPrimary(primaryShard, () -> state),
+            new TestPrimary(primaryShard, () -> initialReplicationGroup),
                 listener, new TestReplicaProxy(primaryTerm), logger, "test");
 
         if (passesActiveShardCheck) {
@@ -296,10 +355,15 @@ public class ReplicationOperationTests extends ESTestCase {
         final long primaryTerm = indexMetaData.primaryTerm(0);
         final ShardRouting primaryRouting = state.getRoutingTable().shardRoutingTable(shardId).primaryShard();
 
+        final Set<String> inSyncAllocationIds = indexMetaData.inSyncAllocationIds(0);
+        final IndexShardRoutingTable shardRoutingTable = state.routingTable().index(index).shard(shardId.id());
+        final Set<String> trackedShards = shardRoutingTable.getAllAllocationIds();
+        final ReplicationGroup initialReplicationGroup = new ReplicationGroup(shardRoutingTable, inSyncAllocationIds, trackedShards);
+
         final boolean fatal = randomBoolean();
         final AtomicBoolean primaryFailed = new AtomicBoolean();
         final ReplicationOperation.Primary<Request, Request, TestPrimary.Result> primary =
-            new TestPrimary(primaryRouting, () -> state) {
+            new TestPrimary(primaryRouting, () -> initialReplicationGroup) {
 
             @Override
             public void failShard(String message, Exception exception) {
@@ -330,10 +394,10 @@ public class ReplicationOperationTests extends ESTestCase {
         final ShardInfo shardInfo = listener.actionGet().getShardInfo();
         assertThat(shardInfo.getFailed(), equalTo(0));
         assertThat(shardInfo.getFailures(), arrayWithSize(0));
-        assertThat(shardInfo.getSuccessful(), equalTo(1 + getExpectedReplicas(shardId, state).size()));
+        assertThat(shardInfo.getSuccessful(), equalTo(1 + getExpectedReplicas(shardId, state, trackedShards).size()));
     }
 
-    private Set<ShardRouting> getExpectedReplicas(ShardId shardId, ClusterState state) {
+    private Set<ShardRouting> getExpectedReplicas(ShardId shardId, ClusterState state, Set<String> trackedShards) {
         Set<ShardRouting> expectedReplicas = new HashSet<>();
         String localNodeId = state.nodes().getLocalNodeId();
         if (state.routingTable().hasIndex(shardId.getIndexName())) {
@@ -342,11 +406,15 @@ public class ReplicationOperationTests extends ESTestCase {
                     continue;
                 }
                 if (localNodeId.equals(shardRouting.currentNodeId()) == false) {
-                    expectedReplicas.add(shardRouting);
+                    if (trackedShards.contains(shardRouting.allocationId().getId())) {
+                        expectedReplicas.add(shardRouting);
+                    }
                 }
 
                 if (shardRouting.relocating() && localNodeId.equals(shardRouting.relocatingNodeId()) == false) {
-                    expectedReplicas.add(shardRouting.getTargetRelocatingShard());
+                    if (trackedShards.contains(shardRouting.getTargetRelocatingShard().allocationId().getId())) {
+                        expectedReplicas.add(shardRouting.getTargetRelocatingShard());
+                    }
                 }
             }
         }
@@ -379,13 +447,13 @@ public class ReplicationOperationTests extends ESTestCase {
         final ShardRouting routing;
         final long localCheckpoint;
         final long globalCheckpoint;
-        final Supplier<ClusterState> clusterStateSupplier;
+        final Supplier<ReplicationGroup> replicationGroupSupplier;
         final Map<String, Long> knownLocalCheckpoints = new HashMap<>();
         final Map<String, Long> knownGlobalCheckpoints = new HashMap<>();
 
-        TestPrimary(ShardRouting routing, Supplier<ClusterState> clusterStateSupplier) {
+        TestPrimary(ShardRouting routing, Supplier<ReplicationGroup> replicationGroupSupplier) {
             this.routing = routing;
-            this.clusterStateSupplier = clusterStateSupplier;
+            this.replicationGroupSupplier = replicationGroupSupplier;
             this.localCheckpoint = random().nextLong();
             this.globalCheckpoint = randomNonNegativeLong();
         }
@@ -453,9 +521,7 @@ public class ReplicationOperationTests extends ESTestCase {
 
         @Override
         public ReplicationGroup getReplicationGroup() {
-            ClusterState clusterState = clusterStateSupplier.get();
-            return new ReplicationGroup(clusterState.routingTable().shardRoutingTable(routing.shardId()),
-                clusterState.metaData().index(routing.index()).inSyncAllocationIds(routing.id()));
+            return replicationGroupSupplier.get();
         }
 
     }

+ 7 - 2
server/src/test/java/org/elasticsearch/action/support/replication/TransportReplicationActionTests.java

@@ -95,6 +95,7 @@ import java.util.Collections;
 import java.util.HashSet;
 import java.util.List;
 import java.util.Locale;
+import java.util.Set;
 import java.util.concurrent.ExecutionException;
 import java.util.concurrent.TimeUnit;
 import java.util.concurrent.atomic.AtomicBoolean;
@@ -683,9 +684,13 @@ public class TransportReplicationActionTests extends ESTestCase {
         final IndexShard shard = mock(IndexShard.class);
         when(shard.getPrimaryTerm()).thenReturn(primaryTerm);
         when(shard.routingEntry()).thenReturn(routingEntry);
+        IndexShardRoutingTable shardRoutingTable = clusterService.state().routingTable().shardRoutingTable(shardId);
+        Set<String> inSyncIds = randomBoolean() ? Collections.singleton(routingEntry.allocationId().getId()) :
+            clusterService.state().metaData().index(index).inSyncAllocationIds(0);
         when(shard.getReplicationGroup()).thenReturn(
-            new ReplicationGroup(clusterService.state().routingTable().shardRoutingTable(shardId),
-                clusterService.state().metaData().index(index).inSyncAllocationIds(0)));
+            new ReplicationGroup(shardRoutingTable,
+                inSyncIds,
+                shardRoutingTable.getAllAllocationIds()));
         doAnswer(invocation -> {
             ((ActionListener<Releasable>)invocation.getArguments()[0]).onResponse(() -> {});
             return null;

+ 2 - 2
server/src/test/java/org/elasticsearch/index/engine/InternalEngineTests.java

@@ -109,7 +109,7 @@ import org.elasticsearch.index.mapper.ParsedDocument;
 import org.elasticsearch.index.mapper.RootObjectMapper;
 import org.elasticsearch.index.mapper.SeqNoFieldMapper;
 import org.elasticsearch.index.mapper.SourceFieldMapper;
-import org.elasticsearch.index.seqno.GlobalCheckpointTracker;
+import org.elasticsearch.index.seqno.ReplicationTracker;
 import org.elasticsearch.index.seqno.LocalCheckpointTracker;
 import org.elasticsearch.index.seqno.SequenceNumbers;
 import org.elasticsearch.index.shard.IndexSearcherWrapper;
@@ -1968,7 +1968,7 @@ public class InternalEngineTests extends EngineTestCase {
             final ShardRouting primary = TestShardRouting.newShardRouting("test", shardId.id(), "node1", null, true,
                 ShardRoutingState.STARTED, allocationId);
             final ShardRouting replica = TestShardRouting.newShardRouting(shardId, "node2", false, ShardRoutingState.STARTED);
-            GlobalCheckpointTracker gcpTracker = (GlobalCheckpointTracker) initialEngine.config().getGlobalCheckpointSupplier();
+            ReplicationTracker gcpTracker = (ReplicationTracker) initialEngine.config().getGlobalCheckpointSupplier();
             gcpTracker.updateFromMaster(1L, new HashSet<>(Arrays.asList(primary.allocationId().getId(),
                 replica.allocationId().getId())),
                 new IndexShardRoutingTable.Builder(shardId).addShard(primary).addShard(replica).build(), Collections.emptySet());

+ 1 - 2
server/src/test/java/org/elasticsearch/index/replication/ESIndexLevelReplicationTestCase.java

@@ -499,8 +499,7 @@ public abstract class ESIndexLevelReplicationTestCase extends IndexShardTestCase
 
             @Override
             public org.elasticsearch.index.shard.ReplicationGroup getReplicationGroup() {
-                return new org.elasticsearch.index.shard.ReplicationGroup(replicationGroup.routingTable(Function.identity()),
-                    replicationGroup.activeIds());
+                return replicationGroup.primary.getReplicationGroup();
             }
 
         }

+ 1 - 1
server/src/test/java/org/elasticsearch/index/replication/RecoveryDuringReplicationTests.java

@@ -247,7 +247,7 @@ public class RecoveryDuringReplicationTests extends ESIndexLevelReplicationTestC
 
             // check that local checkpoint of new primary is properly tracked after primary promotion
             assertThat(newPrimary.getLocalCheckpoint(), equalTo(totalDocs - 1L));
-            assertThat(IndexShardTestCase.getGlobalCheckpointTracker(newPrimary)
+            assertThat(IndexShardTestCase.getReplicationTracker(newPrimary)
                 .getTrackedLocalCheckpointForShard(newPrimary.routingEntry().allocationId().getId()).getLocalCheckpoint(),
                 equalTo(totalDocs - 1L));
 

+ 50 - 43
server/src/test/java/org/elasticsearch/index/seqno/GlobalCheckpointTrackerTests.java → server/src/test/java/org/elasticsearch/index/seqno/ReplicationTrackerTests.java

@@ -59,10 +59,10 @@ import static org.hamcrest.Matchers.equalTo;
 import static org.hamcrest.Matchers.greaterThan;
 import static org.hamcrest.Matchers.not;
 
-public class GlobalCheckpointTrackerTests extends ESTestCase {
+public class ReplicationTrackerTests extends ESTestCase {
 
     public void testEmptyShards() {
-        final GlobalCheckpointTracker tracker = newTracker(AllocationId.newInitializing());
+        final ReplicationTracker tracker = newTracker(AllocationId.newInitializing());
         assertThat(tracker.getGlobalCheckpoint(), equalTo(UNASSIGNED_SEQ_NO));
     }
 
@@ -116,7 +116,7 @@ public class GlobalCheckpointTrackerTests extends ESTestCase {
 
 
         final AllocationId primaryId = active.iterator().next();
-        final GlobalCheckpointTracker tracker = newTracker(primaryId);
+        final ReplicationTracker tracker = newTracker(primaryId);
         assertThat(tracker.getGlobalCheckpoint(), equalTo(UNASSIGNED_SEQ_NO));
 
         logger.info("--> using allocations");
@@ -134,7 +134,9 @@ public class GlobalCheckpointTrackerTests extends ESTestCase {
 
         tracker.updateFromMaster(initialClusterStateVersion, ids(active), routingTable(initializing, primaryId), emptySet());
         tracker.activatePrimaryMode(NO_OPS_PERFORMED);
-        initializing.forEach(aId -> markAllocationIdAsInSyncQuietly(tracker, aId.getId(), NO_OPS_PERFORMED));
+        assertThat(tracker.getReplicationGroup().getReplicationTargets().size(), equalTo(1));
+        initializing.forEach(aId -> markAsTrackingAndInSyncQuietly(tracker, aId.getId(), NO_OPS_PERFORMED));
+        assertThat(tracker.getReplicationGroup().getReplicationTargets().size(), equalTo(1 + initializing.size()));
         allocations.keySet().forEach(aId -> tracker.updateLocalCheckpoint(aId.getId(), allocations.get(aId)));
 
         assertThat(tracker.getGlobalCheckpoint(), equalTo(minLocalCheckpoint));
@@ -164,9 +166,9 @@ public class GlobalCheckpointTrackerTests extends ESTestCase {
         // now notify for the new id
         if (randomBoolean()) {
             tracker.updateLocalCheckpoint(extraId.getId(), minLocalCheckpointAfterUpdates + 1 + randomInt(4));
-            markAllocationIdAsInSyncQuietly(tracker, extraId.getId(), randomInt((int) minLocalCheckpointAfterUpdates));
+            markAsTrackingAndInSyncQuietly(tracker, extraId.getId(), randomInt((int) minLocalCheckpointAfterUpdates));
         } else {
-            markAllocationIdAsInSyncQuietly(tracker, extraId.getId(), minLocalCheckpointAfterUpdates + 1 + randomInt(4));
+            markAsTrackingAndInSyncQuietly(tracker, extraId.getId(), minLocalCheckpointAfterUpdates + 1 + randomInt(4));
         }
 
         // now it should be incremented
@@ -180,10 +182,10 @@ public class GlobalCheckpointTrackerTests extends ESTestCase {
         assigned.putAll(active);
         assigned.putAll(initializing);
         AllocationId primaryId = active.keySet().iterator().next();
-        final GlobalCheckpointTracker tracker = newTracker(primaryId);
+        final ReplicationTracker tracker = newTracker(primaryId);
         tracker.updateFromMaster(randomNonNegativeLong(), ids(active.keySet()), routingTable(initializing.keySet(), primaryId), emptySet());
         tracker.activatePrimaryMode(NO_OPS_PERFORMED);
-        randomSubsetOf(initializing.keySet()).forEach(k -> markAllocationIdAsInSyncQuietly(tracker, k.getId(), NO_OPS_PERFORMED));
+        randomSubsetOf(initializing.keySet()).forEach(k -> markAsTrackingAndInSyncQuietly(tracker, k.getId(), NO_OPS_PERFORMED));
         final AllocationId missingActiveID = randomFrom(active.keySet());
         assigned
                 .entrySet()
@@ -205,11 +207,11 @@ public class GlobalCheckpointTrackerTests extends ESTestCase {
         logger.info("active: {}, initializing: {}", active, initializing);
 
         AllocationId primaryId = active.keySet().iterator().next();
-        final GlobalCheckpointTracker tracker = newTracker(primaryId);
+        final ReplicationTracker tracker = newTracker(primaryId);
         tracker.updateFromMaster(randomNonNegativeLong(), ids(active.keySet()), routingTable(initializing.keySet(), primaryId), emptySet());
         tracker.activatePrimaryMode(NO_OPS_PERFORMED);
         randomSubsetOf(randomIntBetween(1, initializing.size() - 1),
-            initializing.keySet()).forEach(aId -> markAllocationIdAsInSyncQuietly(tracker, aId.getId(), NO_OPS_PERFORMED));
+            initializing.keySet()).forEach(aId -> markAsTrackingAndInSyncQuietly(tracker, aId.getId(), NO_OPS_PERFORMED));
 
         active.forEach((aid, localCP) -> tracker.updateLocalCheckpoint(aid.getId(), localCP));
 
@@ -225,12 +227,12 @@ public class GlobalCheckpointTrackerTests extends ESTestCase {
         final Map<AllocationId, Long> initializing = randomAllocationsWithLocalCheckpoints(1, 5);
         final Map<AllocationId, Long> nonApproved = randomAllocationsWithLocalCheckpoints(1, 5);
         final AllocationId primaryId = active.keySet().iterator().next();
-        final GlobalCheckpointTracker tracker = newTracker(primaryId);
+        final ReplicationTracker tracker = newTracker(primaryId);
         tracker.updateFromMaster(randomNonNegativeLong(), ids(active.keySet()), routingTable(initializing.keySet(), primaryId), emptySet());
         tracker.activatePrimaryMode(NO_OPS_PERFORMED);
-        initializing.keySet().forEach(k -> markAllocationIdAsInSyncQuietly(tracker, k.getId(), NO_OPS_PERFORMED));
+        initializing.keySet().forEach(k -> markAsTrackingAndInSyncQuietly(tracker, k.getId(), NO_OPS_PERFORMED));
         nonApproved.keySet().forEach(k ->
-            expectThrows(IllegalStateException.class, () -> markAllocationIdAsInSyncQuietly(tracker, k.getId(), NO_OPS_PERFORMED)));
+            expectThrows(IllegalStateException.class, () -> markAsTrackingAndInSyncQuietly(tracker, k.getId(), NO_OPS_PERFORMED)));
 
         List<Map<AllocationId, Long>> allocations = Arrays.asList(active, initializing, nonApproved);
         Collections.shuffle(allocations, random());
@@ -260,13 +262,13 @@ public class GlobalCheckpointTrackerTests extends ESTestCase {
         if (randomBoolean()) {
             allocations.putAll(initializingToBeRemoved);
         }
-        final GlobalCheckpointTracker tracker = newTracker(primaryId);
+        final ReplicationTracker tracker = newTracker(primaryId);
         tracker.updateFromMaster(initialClusterStateVersion, ids(active), routingTable(initializing, primaryId), emptySet());
         tracker.activatePrimaryMode(NO_OPS_PERFORMED);
         if (randomBoolean()) {
-            initializingToStay.keySet().forEach(k -> markAllocationIdAsInSyncQuietly(tracker, k.getId(), NO_OPS_PERFORMED));
+            initializingToStay.keySet().forEach(k -> markAsTrackingAndInSyncQuietly(tracker, k.getId(), NO_OPS_PERFORMED));
         } else {
-            initializing.forEach(k -> markAllocationIdAsInSyncQuietly(tracker, k.getId(), NO_OPS_PERFORMED));
+            initializing.forEach(k -> markAsTrackingAndInSyncQuietly(tracker, k.getId(), NO_OPS_PERFORMED));
         }
         if (randomBoolean()) {
             allocations.forEach((aid, localCP) -> tracker.updateLocalCheckpoint(aid.getId(), localCP));
@@ -302,7 +304,7 @@ public class GlobalCheckpointTrackerTests extends ESTestCase {
         final AtomicBoolean complete = new AtomicBoolean();
         final AllocationId inSyncAllocationId = AllocationId.newInitializing();
         final AllocationId trackingAllocationId = AllocationId.newInitializing();
-        final GlobalCheckpointTracker tracker = newTracker(inSyncAllocationId);
+        final ReplicationTracker tracker = newTracker(inSyncAllocationId);
         tracker.updateFromMaster(randomNonNegativeLong(), Collections.singleton(inSyncAllocationId.getId()),
             routingTable(Collections.singleton(trackingAllocationId), inSyncAllocationId), emptySet());
         tracker.activatePrimaryMode(globalCheckpoint);
@@ -310,6 +312,7 @@ public class GlobalCheckpointTrackerTests extends ESTestCase {
             try {
                 // synchronize starting with the test thread
                 barrier.await();
+                tracker.initiateTracking(trackingAllocationId.getId());
                 tracker.markAllocationIdAsInSync(trackingAllocationId.getId(), localCheckpoint);
                 complete.set(true);
                 // synchronize with the test thread checking if we are no longer waiting
@@ -343,8 +346,8 @@ public class GlobalCheckpointTrackerTests extends ESTestCase {
         thread.join();
     }
 
-    private GlobalCheckpointTracker newTracker(final AllocationId allocationId) {
-        return new GlobalCheckpointTracker(
+    private ReplicationTracker newTracker(final AllocationId allocationId) {
+        return new ReplicationTracker(
                 new ShardId("test", "_na_", 0),
                 allocationId.getId(),
                 IndexSettingsModule.newIndexSettings("test", Settings.EMPTY),
@@ -358,7 +361,7 @@ public class GlobalCheckpointTrackerTests extends ESTestCase {
         final AtomicBoolean interrupted = new AtomicBoolean();
         final AllocationId inSyncAllocationId = AllocationId.newInitializing();
         final AllocationId trackingAllocationId = AllocationId.newInitializing();
-        final GlobalCheckpointTracker tracker = newTracker(inSyncAllocationId);
+        final ReplicationTracker tracker = newTracker(inSyncAllocationId);
         tracker.updateFromMaster(randomNonNegativeLong(), Collections.singleton(inSyncAllocationId.getId()),
             routingTable(Collections.singleton(trackingAllocationId), inSyncAllocationId), emptySet());
         tracker.activatePrimaryMode(globalCheckpoint);
@@ -370,6 +373,7 @@ public class GlobalCheckpointTrackerTests extends ESTestCase {
                 throw new RuntimeException(e);
             }
             try {
+                tracker.initiateTracking(trackingAllocationId.getId());
                 tracker.markAllocationIdAsInSync(trackingAllocationId.getId(), localCheckpoint);
             } catch (final InterruptedException e) {
                 interrupted.set(true);
@@ -407,7 +411,7 @@ public class GlobalCheckpointTrackerTests extends ESTestCase {
         final Set<AllocationId> initializingIds = activeAndInitializingAllocationIds.v2();
         AllocationId primaryId = activeAllocationIds.iterator().next();
         IndexShardRoutingTable routingTable = routingTable(initializingIds, primaryId);
-        final GlobalCheckpointTracker tracker = newTracker(primaryId);
+        final ReplicationTracker tracker = newTracker(primaryId);
         tracker.updateFromMaster(initialClusterStateVersion, ids(activeAllocationIds), routingTable, emptySet());
         tracker.activatePrimaryMode(NO_OPS_PERFORMED);
         assertThat(tracker.getReplicationGroup().getInSyncAllocationIds(), equalTo(ids(activeAllocationIds)));
@@ -508,6 +512,7 @@ public class GlobalCheckpointTrackerTests extends ESTestCase {
         final Thread thread = new Thread(() -> {
             try {
                 barrier.await();
+                tracker.initiateTracking(newSyncingAllocationId.getId());
                 tracker.markAllocationIdAsInSync(newSyncingAllocationId.getId(), localCheckpoint);
                 barrier.await();
             } catch (final BrokenBarrierException | InterruptedException e) {
@@ -547,13 +552,13 @@ public class GlobalCheckpointTrackerTests extends ESTestCase {
     }
 
     /**
-     * If we do not update the global checkpoint in {@link GlobalCheckpointTracker#markAllocationIdAsInSync(String, long)} after adding the
+     * If we do not update the global checkpoint in {@link ReplicationTracker#markAllocationIdAsInSync(String, long)} after adding the
      * allocation ID to the in-sync set and removing it from pending, the local checkpoint update that freed the thread waiting for the
      * local checkpoint to advance could miss updating the global checkpoint in a race if the waiting thread did not add the allocation
      * ID to the in-sync set and remove it from the pending set before the local checkpoint updating thread executed the global checkpoint
-     * update. This test fails without an additional call to {@link GlobalCheckpointTracker#updateGlobalCheckpointOnPrimary()} after
-     * removing the allocation ID from the pending set in {@link GlobalCheckpointTracker#markAllocationIdAsInSync(String, long)} (even if a
-     * call is added after notifying all waiters in {@link GlobalCheckpointTracker#updateLocalCheckpoint(String, long)}).
+     * update. This test fails without an additional call to {@link ReplicationTracker#updateGlobalCheckpointOnPrimary()} after
+     * removing the allocation ID from the pending set in {@link ReplicationTracker#markAllocationIdAsInSync(String, long)} (even if a
+     * call is added after notifying all waiters in {@link ReplicationTracker#updateLocalCheckpoint(String, long)}).
      *
      * @throws InterruptedException   if the main test thread was interrupted while waiting
      * @throws BrokenBarrierException if the barrier was broken while the main test thread was waiting
@@ -565,7 +570,7 @@ public class GlobalCheckpointTrackerTests extends ESTestCase {
         final CyclicBarrier barrier = new CyclicBarrier(4);
 
         final int activeLocalCheckpoint = randomIntBetween(0, Integer.MAX_VALUE - 1);
-        final GlobalCheckpointTracker tracker = newTracker(active);
+        final ReplicationTracker tracker = newTracker(active);
         tracker.updateFromMaster(
                 randomNonNegativeLong(),
                 Collections.singleton(active.getId()),
@@ -595,6 +600,7 @@ public class GlobalCheckpointTrackerTests extends ESTestCase {
         final Thread markingThread = new Thread(() -> {
             try {
                 barrier.await();
+                tracker.initiateTracking(initializing.getId());
                 tracker.markAllocationIdAsInSync(initializing.getId(), initializingLocalCheckpoint - 1);
             } catch (final BrokenBarrierException | InterruptedException e) {
                 throw new RuntimeException(e);
@@ -619,10 +625,10 @@ public class GlobalCheckpointTrackerTests extends ESTestCase {
 
         FakeClusterState clusterState = initialState();
         final AllocationId primaryAllocationId = clusterState.routingTable.primaryShard().allocationId();
-        GlobalCheckpointTracker oldPrimary =
-                new GlobalCheckpointTracker(shardId, primaryAllocationId.getId(), indexSettings, UNASSIGNED_SEQ_NO);
-        GlobalCheckpointTracker newPrimary =
-                new GlobalCheckpointTracker(shardId, primaryAllocationId.getRelocationId(), indexSettings, UNASSIGNED_SEQ_NO);
+        ReplicationTracker oldPrimary =
+                new ReplicationTracker(shardId, primaryAllocationId.getId(), indexSettings, UNASSIGNED_SEQ_NO);
+        ReplicationTracker newPrimary =
+                new ReplicationTracker(shardId, primaryAllocationId.getRelocationId(), indexSettings, UNASSIGNED_SEQ_NO);
 
         Set<String> allocationIds = new HashSet<>(Arrays.asList(oldPrimary.shardAllocationId, newPrimary.shardAllocationId));
 
@@ -647,12 +653,12 @@ public class GlobalCheckpointTrackerTests extends ESTestCase {
         }
 
         // simulate transferring the global checkpoint to the new primary after finalizing recovery before the handoff
-        markAllocationIdAsInSyncQuietly(
+        markAsTrackingAndInSyncQuietly(
                 oldPrimary,
                 newPrimary.shardAllocationId,
                 Math.max(SequenceNumbers.NO_OPS_PERFORMED, oldPrimary.getGlobalCheckpoint() + randomInt(5)));
         oldPrimary.updateGlobalCheckpointForShard(newPrimary.shardAllocationId, oldPrimary.getGlobalCheckpoint());
-        GlobalCheckpointTracker.PrimaryContext primaryContext = oldPrimary.startRelocationHandoff();
+        ReplicationTracker.PrimaryContext primaryContext = oldPrimary.startRelocationHandoff();
 
         if (randomBoolean()) {
             // cluster state update after primary context handoff
@@ -685,7 +691,7 @@ public class GlobalCheckpointTrackerTests extends ESTestCase {
         BytesStreamOutput output = new BytesStreamOutput();
         primaryContext.writeTo(output);
         StreamInput streamInput = output.bytes().streamInput();
-        primaryContext = new GlobalCheckpointTracker.PrimaryContext(streamInput);
+        primaryContext = new ReplicationTracker.PrimaryContext(streamInput);
         switch (randomInt(3)) {
             case 0: {
                 // apply cluster state update on old primary while primary context is being transferred
@@ -730,10 +736,10 @@ public class GlobalCheckpointTrackerTests extends ESTestCase {
          * will update its global checkpoint state without the old primary learning of it, and the old primary could have updated its
          * global checkpoint state after the primary context was transferred.
          */
-        Map<String, GlobalCheckpointTracker.CheckpointState> oldPrimaryCheckpointsCopy = new HashMap<>(oldPrimary.checkpoints);
+        Map<String, ReplicationTracker.CheckpointState> oldPrimaryCheckpointsCopy = new HashMap<>(oldPrimary.checkpoints);
         oldPrimaryCheckpointsCopy.remove(oldPrimary.shardAllocationId);
         oldPrimaryCheckpointsCopy.remove(newPrimary.shardAllocationId);
-        Map<String, GlobalCheckpointTracker.CheckpointState> newPrimaryCheckpointsCopy = new HashMap<>(newPrimary.checkpoints);
+        Map<String, ReplicationTracker.CheckpointState> newPrimaryCheckpointsCopy = new HashMap<>(newPrimary.checkpoints);
         newPrimaryCheckpointsCopy.remove(oldPrimary.shardAllocationId);
         newPrimaryCheckpointsCopy.remove(newPrimary.shardAllocationId);
         assertThat(newPrimaryCheckpointsCopy, equalTo(oldPrimaryCheckpointsCopy));
@@ -761,7 +767,7 @@ public class GlobalCheckpointTrackerTests extends ESTestCase {
     public void testIllegalStateExceptionIfUnknownAllocationId() {
         final AllocationId active = AllocationId.newInitializing();
         final AllocationId initializing = AllocationId.newInitializing();
-        final GlobalCheckpointTracker tracker = newTracker(active);
+        final ReplicationTracker tracker = newTracker(active);
         tracker.updateFromMaster(randomNonNegativeLong(), Collections.singleton(active.getId()),
             routingTable(Collections.singleton(initializing), active), emptySet());
         tracker.activatePrimaryMode(NO_OPS_PERFORMED);
@@ -790,7 +796,7 @@ public class GlobalCheckpointTrackerTests extends ESTestCase {
                 .map(ShardRouting::allocationId).collect(Collectors.toSet());
         }
 
-        public void apply(GlobalCheckpointTracker gcp) {
+        public void apply(ReplicationTracker gcp) {
             gcp.updateFromMaster(version, ids(inSyncIds), routingTable, Collections.emptySet());
         }
     }
@@ -818,20 +824,20 @@ public class GlobalCheckpointTrackerTests extends ESTestCase {
                 routingTable(initializingAllocationIds, primaryShard));
     }
 
-    private static void activatePrimary(GlobalCheckpointTracker gcp) {
+    private static void activatePrimary(ReplicationTracker gcp) {
         gcp.activatePrimaryMode(randomIntBetween(Math.toIntExact(NO_OPS_PERFORMED), 10));
     }
 
-    private static void randomLocalCheckpointUpdate(GlobalCheckpointTracker gcp) {
+    private static void randomLocalCheckpointUpdate(ReplicationTracker gcp) {
         String allocationId = randomFrom(gcp.checkpoints.keySet());
         long currentLocalCheckpoint = gcp.checkpoints.get(allocationId).getLocalCheckpoint();
         gcp.updateLocalCheckpoint(allocationId, Math.max(SequenceNumbers.NO_OPS_PERFORMED, currentLocalCheckpoint + randomInt(5)));
     }
 
-    private static void randomMarkInSync(GlobalCheckpointTracker gcp) {
+    private static void randomMarkInSync(ReplicationTracker gcp) {
         String allocationId = randomFrom(gcp.checkpoints.keySet());
         long newLocalCheckpoint = Math.max(NO_OPS_PERFORMED, gcp.getGlobalCheckpoint() + randomInt(5));
-        markAllocationIdAsInSyncQuietly(gcp, allocationId, newLocalCheckpoint);
+        markAsTrackingAndInSyncQuietly(gcp, allocationId, newLocalCheckpoint);
     }
 
     private static FakeClusterState randomUpdateClusterState(Set<String> allocationIds, FakeClusterState clusterState) {
@@ -876,9 +882,10 @@ public class GlobalCheckpointTrackerTests extends ESTestCase {
         }).collect(Collectors.toSet());
     }
 
-    private static void markAllocationIdAsInSyncQuietly(
-            final GlobalCheckpointTracker tracker, final String allocationId, final long localCheckpoint) {
+    private static void markAsTrackingAndInSyncQuietly(
+        final ReplicationTracker tracker, final String allocationId, final long localCheckpoint) {
         try {
+            tracker.initiateTracking(allocationId);
             tracker.markAllocationIdAsInSync(allocationId, localCheckpoint);
         } catch (final InterruptedException e) {
             throw new RuntimeException(e);

+ 7 - 7
server/src/test/java/org/elasticsearch/index/shard/IndexShardTests.java

@@ -419,7 +419,7 @@ public class IndexShardTests extends IndexShardTestCase {
      * This test makes sure that people can use the shard routing entry to check whether a shard was already promoted to
      * a primary. Concretely this means, that when we publish the routing entry via {@link IndexShard#routingEntry()} the following
      * should have happened
-     * 1) Internal state (ala GlobalCheckpointTracker) have been updated
+     * 1) Internal state (ala ReplicationTracker) have been updated
      * 2) Primary term is set to the new term
      */
     public void testPublishingOrderOnPromotion() throws IOException, BrokenBarrierException, InterruptedException {
@@ -1583,7 +1583,7 @@ public class IndexShardTests extends IndexShardTestCase {
         IndexShardTestCase.updateRoutingEntry(newShard, newShard.routingEntry().moveToStarted());
         // check that local checkpoint of new primary is properly tracked after recovery
         assertThat(newShard.getLocalCheckpoint(), equalTo(totalOps - 1L));
-        assertThat(newShard.getGlobalCheckpointTracker().getTrackedLocalCheckpointForShard(newShard.routingEntry().allocationId().getId())
+        assertThat(newShard.getReplicationTracker().getTrackedLocalCheckpointForShard(newShard.routingEntry().allocationId().getId())
                 .getLocalCheckpoint(), equalTo(totalOps - 1L));
         assertDocCount(newShard, totalOps);
         closeShards(newShard);
@@ -1602,7 +1602,7 @@ public class IndexShardTests extends IndexShardTestCase {
 
         // check that local checkpoint of new primary is properly tracked after primary relocation
         assertThat(primaryTarget.getLocalCheckpoint(), equalTo(totalOps - 1L));
-        assertThat(primaryTarget.getGlobalCheckpointTracker().getTrackedLocalCheckpointForShard(
+        assertThat(primaryTarget.getReplicationTracker().getTrackedLocalCheckpointForShard(
             primaryTarget.routingEntry().allocationId().getId()).getLocalCheckpoint(), equalTo(totalOps - 1L));
         assertDocCount(primaryTarget, totalOps);
         closeShards(primarySource, primaryTarget);
@@ -1813,9 +1813,9 @@ public class IndexShardTests extends IndexShardTestCase {
         }));
         assertThat(target.getLocalCheckpoint(), equalTo(0L));
         assertThat(target.seqNoStats().getMaxSeqNo(), equalTo(0L));
-        assertThat(target.getGlobalCheckpointTracker().getGlobalCheckpoint(), equalTo(0L));
+        assertThat(target.getReplicationTracker().getGlobalCheckpoint(), equalTo(0L));
         IndexShardTestCase.updateRoutingEntry(target, routing.moveToStarted());
-        assertThat(target.getGlobalCheckpointTracker().getTrackedLocalCheckpointForShard(
+        assertThat(target.getReplicationTracker().getTrackedLocalCheckpointForShard(
             target.routingEntry().allocationId().getId()).getLocalCheckpoint(), equalTo(0L));
 
         assertDocs(target, "0");
@@ -2259,9 +2259,9 @@ public class IndexShardTests extends IndexShardTestCase {
             }
             // check that local checkpoint of new primary is properly tracked after recovery
             assertThat(targetShard.getLocalCheckpoint(), equalTo(1L));
-            assertThat(targetShard.getGlobalCheckpointTracker().getGlobalCheckpoint(), equalTo(1L));
+            assertThat(targetShard.getReplicationTracker().getGlobalCheckpoint(), equalTo(1L));
             IndexShardTestCase.updateRoutingEntry(targetShard, ShardRoutingHelper.moveToStarted(targetShard.routingEntry()));
-            assertThat(targetShard.getGlobalCheckpointTracker().getTrackedLocalCheckpointForShard(
+            assertThat(targetShard.getReplicationTracker().getTrackedLocalCheckpointForShard(
                 targetShard.routingEntry().allocationId().getId()).getLocalCheckpoint(), equalTo(1L));
             assertDocCount(targetShard, 2);
         }

+ 0 - 1
server/src/test/java/org/elasticsearch/index/shard/RefreshListenersTests.java

@@ -48,7 +48,6 @@ import org.elasticsearch.index.mapper.IdFieldMapper;
 import org.elasticsearch.index.mapper.ParseContext.Document;
 import org.elasticsearch.index.mapper.ParsedDocument;
 import org.elasticsearch.index.mapper.SeqNoFieldMapper;
-import org.elasticsearch.index.seqno.GlobalCheckpointTracker;
 import org.elasticsearch.index.seqno.SequenceNumbers;
 import org.elasticsearch.index.store.DirectoryService;
 import org.elasticsearch.index.store.Store;

+ 2 - 2
test/framework/src/main/java/org/elasticsearch/index/engine/EngineTestCase.java

@@ -60,7 +60,7 @@ import org.elasticsearch.index.mapper.ParsedDocument;
 import org.elasticsearch.index.mapper.SeqNoFieldMapper;
 import org.elasticsearch.index.mapper.SourceFieldMapper;
 import org.elasticsearch.index.mapper.Uid;
-import org.elasticsearch.index.seqno.GlobalCheckpointTracker;
+import org.elasticsearch.index.seqno.ReplicationTracker;
 import org.elasticsearch.index.seqno.LocalCheckpointTracker;
 import org.elasticsearch.index.seqno.SequenceNumbers;
 import org.elasticsearch.index.shard.ShardId;
@@ -425,7 +425,7 @@ public abstract class EngineTestCase extends ESTestCase {
                 TimeValue.timeValueMinutes(5), refreshListenerList, Collections.emptyList(), indexSort, handler,
                 new NoneCircuitBreakerService(),
                 globalCheckpointSupplier == null ?
-                    new GlobalCheckpointTracker(shardId, allocationId.getId(), indexSettings,
+                    new ReplicationTracker(shardId, allocationId.getId(), indexSettings,
                         SequenceNumbers.UNASSIGNED_SEQ_NO) : globalCheckpointSupplier);
         return config;
     }

+ 3 - 3
test/framework/src/main/java/org/elasticsearch/index/shard/IndexShardTestCase.java

@@ -60,7 +60,7 @@ import org.elasticsearch.index.mapper.MapperService;
 import org.elasticsearch.index.mapper.Mapping;
 import org.elasticsearch.index.mapper.SourceToParse;
 import org.elasticsearch.index.mapper.Uid;
-import org.elasticsearch.index.seqno.GlobalCheckpointTracker;
+import org.elasticsearch.index.seqno.ReplicationTracker;
 import org.elasticsearch.index.seqno.SequenceNumbers;
 import org.elasticsearch.index.similarity.SimilarityService;
 import org.elasticsearch.index.snapshots.IndexShardSnapshotStatus;
@@ -640,7 +640,7 @@ public abstract class IndexShardTestCase extends ESTestCase {
         return indexShard.getEngine();
     }
 
-    public static GlobalCheckpointTracker getGlobalCheckpointTracker(IndexShard indexShard) {
-        return indexShard.getGlobalCheckpointTracker();
+    public static ReplicationTracker getReplicationTracker(IndexShard indexShard) {
+        return indexShard.getReplicationTracker();
     }
 }