|  | @@ -35,7 +35,12 @@ import org.elasticsearch.cluster.node.DiscoveryNodes;
 | 
	
		
			
				|  |  |  import org.elasticsearch.cluster.routing.IndexShardRoutingTable;
 | 
	
		
			
				|  |  |  import org.elasticsearch.cluster.routing.ShardRouting;
 | 
	
		
			
				|  |  |  import org.elasticsearch.cluster.routing.ShardRoutingState;
 | 
	
		
			
				|  |  | +import org.elasticsearch.common.breaker.CircuitBreaker;
 | 
	
		
			
				|  |  | +import org.elasticsearch.common.breaker.CircuitBreakingException;
 | 
	
		
			
				|  |  | +import org.elasticsearch.common.transport.TransportAddress;
 | 
	
		
			
				|  |  | +import org.elasticsearch.common.unit.TimeValue;
 | 
	
		
			
				|  |  |  import org.elasticsearch.common.util.concurrent.ConcurrentCollections;
 | 
	
		
			
				|  |  | +import org.elasticsearch.common.util.concurrent.EsRejectedExecutionException;
 | 
	
		
			
				|  |  |  import org.elasticsearch.common.util.set.Sets;
 | 
	
		
			
				|  |  |  import org.elasticsearch.index.shard.IndexShardNotStartedException;
 | 
	
		
			
				|  |  |  import org.elasticsearch.index.shard.IndexShardState;
 | 
	
	
		
			
				|  | @@ -43,8 +48,13 @@ import org.elasticsearch.index.shard.ReplicationGroup;
 | 
	
		
			
				|  |  |  import org.elasticsearch.index.shard.ShardId;
 | 
	
		
			
				|  |  |  import org.elasticsearch.node.NodeClosedException;
 | 
	
		
			
				|  |  |  import org.elasticsearch.test.ESTestCase;
 | 
	
		
			
				|  |  | +import org.elasticsearch.threadpool.TestThreadPool;
 | 
	
		
			
				|  |  | +import org.elasticsearch.threadpool.ThreadPool;
 | 
	
		
			
				|  |  | +import org.elasticsearch.transport.ConnectTransportException;
 | 
	
		
			
				|  |  | +import org.elasticsearch.transport.RemoteTransportException;
 | 
	
		
			
				|  |  |  import org.elasticsearch.transport.SendRequestTransportException;
 | 
	
		
			
				|  |  |  
 | 
	
		
			
				|  |  | +import java.net.InetAddress;
 | 
	
		
			
				|  |  |  import java.util.ArrayList;
 | 
	
		
			
				|  |  |  import java.util.Collections;
 | 
	
		
			
				|  |  |  import java.util.HashMap;
 | 
	
	
		
			
				|  | @@ -54,6 +64,7 @@ import java.util.Map;
 | 
	
		
			
				|  |  |  import java.util.Set;
 | 
	
		
			
				|  |  |  import java.util.concurrent.ExecutionException;
 | 
	
		
			
				|  |  |  import java.util.concurrent.atomic.AtomicBoolean;
 | 
	
		
			
				|  |  | +import java.util.concurrent.atomic.AtomicInteger;
 | 
	
		
			
				|  |  |  import java.util.concurrent.atomic.AtomicReference;
 | 
	
		
			
				|  |  |  import java.util.function.Supplier;
 | 
	
		
			
				|  |  |  
 | 
	
	
		
			
				|  | @@ -67,6 +78,20 @@ import static org.hamcrest.Matchers.nullValue;
 | 
	
		
			
				|  |  |  
 | 
	
		
			
				|  |  |  public class ReplicationOperationTests extends ESTestCase {
 | 
	
		
			
				|  |  |  
 | 
	
		
			
				|  |  | +    private ThreadPool threadPool;
 | 
	
		
			
				|  |  | +
 | 
	
		
			
				|  |  | +    @Override
 | 
	
		
			
				|  |  | +    public void setUp() throws Exception {
 | 
	
		
			
				|  |  | +        super.setUp();
 | 
	
		
			
				|  |  | +        threadPool = new TestThreadPool(getTestName());
 | 
	
		
			
				|  |  | +    }
 | 
	
		
			
				|  |  | +
 | 
	
		
			
				|  |  | +    @Override
 | 
	
		
			
				|  |  | +    public void tearDown() throws Exception {
 | 
	
		
			
				|  |  | +        terminate(threadPool);
 | 
	
		
			
				|  |  | +        super.tearDown();
 | 
	
		
			
				|  |  | +    }
 | 
	
		
			
				|  |  | +
 | 
	
		
			
				|  |  |      public void testReplication() throws Exception {
 | 
	
		
			
				|  |  |          final String index = "test";
 | 
	
		
			
				|  |  |          final ShardId shardId = new ShardId(index, "_na_", 0);
 | 
	
	
		
			
				|  | @@ -92,7 +117,7 @@ public class ReplicationOperationTests extends ESTestCase {
 | 
	
		
			
				|  |  |          addTrackingInfo(indexShardRoutingTable, primaryShard, trackedShards, untrackedShards);
 | 
	
		
			
				|  |  |          trackedShards.addAll(staleAllocationIds);
 | 
	
		
			
				|  |  |  
 | 
	
		
			
				|  |  | -        final ReplicationGroup replicationGroup = new ReplicationGroup(indexShardRoutingTable, inSyncAllocationIds, trackedShards);
 | 
	
		
			
				|  |  | +        final ReplicationGroup replicationGroup = new ReplicationGroup(indexShardRoutingTable, inSyncAllocationIds, trackedShards, 0);
 | 
	
		
			
				|  |  |  
 | 
	
		
			
				|  |  |          final Set<ShardRouting> expectedReplicas = getExpectedReplicas(shardId, initialState, trackedShards);
 | 
	
		
			
				|  |  |  
 | 
	
	
		
			
				|  | @@ -117,7 +142,7 @@ public class ReplicationOperationTests extends ESTestCase {
 | 
	
		
			
				|  |  |          PlainActionFuture<TestPrimary.Result> listener = new PlainActionFuture<>();
 | 
	
		
			
				|  |  |          final TestReplicaProxy replicasProxy = new TestReplicaProxy(simulatedFailures);
 | 
	
		
			
				|  |  |  
 | 
	
		
			
				|  |  | -        final TestPrimary primary = new TestPrimary(primaryShard, () -> replicationGroup);
 | 
	
		
			
				|  |  | +        final TestPrimary primary = new TestPrimary(primaryShard, () -> replicationGroup, threadPool);
 | 
	
		
			
				|  |  |          final TestReplicationOperation op = new TestReplicationOperation(request, primary, listener, replicasProxy, primaryTerm);
 | 
	
		
			
				|  |  |          op.execute();
 | 
	
		
			
				|  |  |          assertThat("request was not processed on primary", request.processedOnPrimary.get(), equalTo(true));
 | 
	
	
		
			
				|  | @@ -141,6 +166,81 @@ public class ReplicationOperationTests extends ESTestCase {
 | 
	
		
			
				|  |  |          assertThat(primary.knownGlobalCheckpoints, equalTo(replicasProxy.generatedGlobalCheckpoints));
 | 
	
		
			
				|  |  |      }
 | 
	
		
			
				|  |  |  
 | 
	
		
			
				|  |  | +    public void testRetryTransientReplicationFailure() throws Exception {
 | 
	
		
			
				|  |  | +        final String index = "test";
 | 
	
		
			
				|  |  | +        final ShardId shardId = new ShardId(index, "_na_", 0);
 | 
	
		
			
				|  |  | +
 | 
	
		
			
				|  |  | +        ClusterState initialState = stateWithActivePrimary(index, true, randomInt(5));
 | 
	
		
			
				|  |  | +        IndexMetadata indexMetadata = initialState.getMetadata().index(index);
 | 
	
		
			
				|  |  | +        final long primaryTerm = indexMetadata.primaryTerm(0);
 | 
	
		
			
				|  |  | +        final IndexShardRoutingTable indexShardRoutingTable = initialState.getRoutingTable().shardRoutingTable(shardId);
 | 
	
		
			
				|  |  | +        ShardRouting primaryShard = indexShardRoutingTable.primaryShard();
 | 
	
		
			
				|  |  | +        if (primaryShard.relocating() && randomBoolean()) {
 | 
	
		
			
				|  |  | +            // simulate execution of the replication phase on the relocation target node after relocation source was marked as relocated
 | 
	
		
			
				|  |  | +            initialState = ClusterState.builder(initialState)
 | 
	
		
			
				|  |  | +                .nodes(DiscoveryNodes.builder(initialState.nodes()).localNodeId(primaryShard.relocatingNodeId())).build();
 | 
	
		
			
				|  |  | +            primaryShard = primaryShard.getTargetRelocatingShard();
 | 
	
		
			
				|  |  | +        }
 | 
	
		
			
				|  |  | +        // add a few in-sync allocation ids that don't have corresponding routing entries
 | 
	
		
			
				|  |  | +        final Set<String> staleAllocationIds = Sets.newHashSet(generateRandomStringArray(4, 10, false));
 | 
	
		
			
				|  |  | +
 | 
	
		
			
				|  |  | +        final Set<String> inSyncAllocationIds = Sets.union(indexMetadata.inSyncAllocationIds(0), staleAllocationIds);
 | 
	
		
			
				|  |  | +
 | 
	
		
			
				|  |  | +        final Set<String> trackedShards = new HashSet<>();
 | 
	
		
			
				|  |  | +        final Set<String> untrackedShards = new HashSet<>();
 | 
	
		
			
				|  |  | +        addTrackingInfo(indexShardRoutingTable, primaryShard, trackedShards, untrackedShards);
 | 
	
		
			
				|  |  | +        trackedShards.addAll(staleAllocationIds);
 | 
	
		
			
				|  |  | +
 | 
	
		
			
				|  |  | +        final ReplicationGroup replicationGroup = new ReplicationGroup(indexShardRoutingTable, inSyncAllocationIds, trackedShards, 0);
 | 
	
		
			
				|  |  | +
 | 
	
		
			
				|  |  | +        final Set<ShardRouting> expectedReplicas = getExpectedReplicas(shardId, initialState, trackedShards);
 | 
	
		
			
				|  |  | +
 | 
	
		
			
				|  |  | +        final Map<ShardRouting, Exception> simulatedFailures = new HashMap<>();
 | 
	
		
			
				|  |  | +        for (ShardRouting replica : expectedReplicas) {
 | 
	
		
			
				|  |  | +            Exception cause;
 | 
	
		
			
				|  |  | +            Exception exception;
 | 
	
		
			
				|  |  | +            if (randomBoolean()) {
 | 
	
		
			
				|  |  | +                if (randomBoolean()) {
 | 
	
		
			
				|  |  | +                    cause = new CircuitBreakingException("broken", CircuitBreaker.Durability.PERMANENT);
 | 
	
		
			
				|  |  | +                } else {
 | 
	
		
			
				|  |  | +                    cause = new EsRejectedExecutionException("rejected");
 | 
	
		
			
				|  |  | +                }
 | 
	
		
			
				|  |  | +                exception = new RemoteTransportException("remote", cause);
 | 
	
		
			
				|  |  | +            } else {
 | 
	
		
			
				|  |  | +                TransportAddress address = new TransportAddress(InetAddress.getLoopbackAddress(), 9300);
 | 
	
		
			
				|  |  | +                DiscoveryNode node = new DiscoveryNode("replica", address, Version.CURRENT);
 | 
	
		
			
				|  |  | +                cause = new ConnectTransportException(node, "broken");
 | 
	
		
			
				|  |  | +                exception = cause;
 | 
	
		
			
				|  |  | +            }
 | 
	
		
			
				|  |  | +            logger.debug("--> simulating failure on {} with [{}]", replica, exception.getClass().getSimpleName());
 | 
	
		
			
				|  |  | +            simulatedFailures.put(replica, exception);
 | 
	
		
			
				|  |  | +        }
 | 
	
		
			
				|  |  | +
 | 
	
		
			
				|  |  | +        Request request = new Request(shardId);
 | 
	
		
			
				|  |  | +        PlainActionFuture<TestPrimary.Result> listener = new PlainActionFuture<>();
 | 
	
		
			
				|  |  | +        final TestReplicaProxy replicasProxy = new TestReplicaProxy(simulatedFailures, true);
 | 
	
		
			
				|  |  | +
 | 
	
		
			
				|  |  | +        final TestPrimary primary = new TestPrimary(primaryShard, () -> replicationGroup, threadPool);
 | 
	
		
			
				|  |  | +        final TestReplicationOperation op = new TestReplicationOperation(request, primary, listener, replicasProxy, primaryTerm,
 | 
	
		
			
				|  |  | +            TimeValue.timeValueMillis(20), TimeValue.timeValueSeconds(60));
 | 
	
		
			
				|  |  | +        op.execute();
 | 
	
		
			
				|  |  | +        assertThat("request was not processed on primary", request.processedOnPrimary.get(), equalTo(true));
 | 
	
		
			
				|  |  | +        assertThat(request.processedOnReplicas, equalTo(expectedReplicas));
 | 
	
		
			
				|  |  | +        assertThat(replicasProxy.failedReplicas.size(), equalTo(0));
 | 
	
		
			
				|  |  | +        assertThat(replicasProxy.markedAsStaleCopies, equalTo(staleAllocationIds));
 | 
	
		
			
				|  |  | +        assertThat("post replication operations not run on primary", request.runPostReplicationActionsOnPrimary.get(), equalTo(true));
 | 
	
		
			
				|  |  | +        ShardInfo shardInfo = listener.actionGet().getShardInfo();
 | 
	
		
			
				|  |  | +        assertThat(shardInfo.getSuccessful(), equalTo(1 + expectedReplicas.size()));
 | 
	
		
			
				|  |  | +        final List<ShardRouting> unassignedShards = indexShardRoutingTable.shardsWithState(ShardRoutingState.UNASSIGNED);
 | 
	
		
			
				|  |  | +        final int totalShards = 1 + expectedReplicas.size() + unassignedShards.size() + untrackedShards.size();
 | 
	
		
			
				|  |  | +        assertThat(replicationGroup.toString(), shardInfo.getTotal(), equalTo(totalShards));
 | 
	
		
			
				|  |  | +
 | 
	
		
			
				|  |  | +        assertThat(primary.knownLocalCheckpoints.remove(primaryShard.allocationId().getId()), equalTo(primary.localCheckpoint));
 | 
	
		
			
				|  |  | +        assertThat(primary.knownLocalCheckpoints, equalTo(replicasProxy.generatedLocalCheckpoints));
 | 
	
		
			
				|  |  | +        assertThat(primary.knownGlobalCheckpoints.remove(primaryShard.allocationId().getId()), equalTo(primary.globalCheckpoint));
 | 
	
		
			
				|  |  | +        assertThat(primary.knownGlobalCheckpoints, equalTo(replicasProxy.generatedGlobalCheckpoints));
 | 
	
		
			
				|  |  | +    }
 | 
	
		
			
				|  |  | +
 | 
	
		
			
				|  |  |      private void addTrackingInfo(IndexShardRoutingTable indexShardRoutingTable, ShardRouting primaryShard, Set<String> trackedShards,
 | 
	
		
			
				|  |  |                                   Set<String> untrackedShards) {
 | 
	
		
			
				|  |  |          for (ShardRouting shr : indexShardRoutingTable.shards()) {
 | 
	
	
		
			
				|  | @@ -187,7 +287,7 @@ public class ReplicationOperationTests extends ESTestCase {
 | 
	
		
			
				|  |  |          addTrackingInfo(indexShardRoutingTable, primaryShard, trackedShards, new HashSet<>());
 | 
	
		
			
				|  |  |          trackedShards.addAll(staleAllocationIds);
 | 
	
		
			
				|  |  |  
 | 
	
		
			
				|  |  | -        final ReplicationGroup replicationGroup = new ReplicationGroup(indexShardRoutingTable, inSyncAllocationIds, trackedShards);
 | 
	
		
			
				|  |  | +        final ReplicationGroup replicationGroup = new ReplicationGroup(indexShardRoutingTable, inSyncAllocationIds, trackedShards, 0);
 | 
	
		
			
				|  |  |  
 | 
	
		
			
				|  |  |          final Set<ShardRouting> expectedReplicas = getExpectedReplicas(shardId, initialState, trackedShards);
 | 
	
		
			
				|  |  |  
 | 
	
	
		
			
				|  | @@ -234,7 +334,7 @@ public class ReplicationOperationTests extends ESTestCase {
 | 
	
		
			
				|  |  |              }
 | 
	
		
			
				|  |  |          };
 | 
	
		
			
				|  |  |          AtomicBoolean primaryFailed = new AtomicBoolean();
 | 
	
		
			
				|  |  | -        final TestPrimary primary = new TestPrimary(primaryShard, () -> replicationGroup) {
 | 
	
		
			
				|  |  | +        final TestPrimary primary = new TestPrimary(primaryShard, () -> replicationGroup, threadPool) {
 | 
	
		
			
				|  |  |              @Override
 | 
	
		
			
				|  |  |              public void failShard(String message, Exception exception) {
 | 
	
		
			
				|  |  |                  assertThat(exception, instanceOf(ShardStateAction.NoLongerPrimaryShardException.class));
 | 
	
	
		
			
				|  | @@ -263,7 +363,7 @@ public class ReplicationOperationTests extends ESTestCase {
 | 
	
		
			
				|  |  |          IndexShardRoutingTable shardRoutingTable = initialState.getRoutingTable().shardRoutingTable(shardId);
 | 
	
		
			
				|  |  |          Set<String> trackedShards = new HashSet<>();
 | 
	
		
			
				|  |  |          addTrackingInfo(shardRoutingTable, null, trackedShards, new HashSet<>());
 | 
	
		
			
				|  |  | -        ReplicationGroup initialReplicationGroup = new ReplicationGroup(shardRoutingTable, inSyncAllocationIds, trackedShards);
 | 
	
		
			
				|  |  | +        ReplicationGroup initialReplicationGroup = new ReplicationGroup(shardRoutingTable, inSyncAllocationIds, trackedShards, 0);
 | 
	
		
			
				|  |  |  
 | 
	
		
			
				|  |  |          final ClusterState stateWithAddedReplicas;
 | 
	
		
			
				|  |  |          if (randomBoolean()) {
 | 
	
	
		
			
				|  | @@ -278,13 +378,13 @@ public class ReplicationOperationTests extends ESTestCase {
 | 
	
		
			
				|  |  |          trackedShards = new HashSet<>();
 | 
	
		
			
				|  |  |          addTrackingInfo(shardRoutingTable, null, trackedShards, new HashSet<>());
 | 
	
		
			
				|  |  |  
 | 
	
		
			
				|  |  | -        ReplicationGroup updatedReplicationGroup = new ReplicationGroup(shardRoutingTable, inSyncAllocationIds, trackedShards);
 | 
	
		
			
				|  |  | +        ReplicationGroup updatedReplicationGroup = new ReplicationGroup(shardRoutingTable, inSyncAllocationIds, trackedShards, 0);
 | 
	
		
			
				|  |  |  
 | 
	
		
			
				|  |  |          final AtomicReference<ReplicationGroup> replicationGroup = new AtomicReference<>(initialReplicationGroup);
 | 
	
		
			
				|  |  |          logger.debug("--> using initial replicationGroup:\n{}", replicationGroup.get());
 | 
	
		
			
				|  |  |          final long primaryTerm = initialState.getMetadata().index(shardId.getIndexName()).primaryTerm(shardId.id());
 | 
	
		
			
				|  |  |          final ShardRouting primaryShard = updatedReplicationGroup.getRoutingTable().primaryShard();
 | 
	
		
			
				|  |  | -        final TestPrimary primary = new TestPrimary(primaryShard, replicationGroup::get) {
 | 
	
		
			
				|  |  | +        final TestPrimary primary = new TestPrimary(primaryShard, replicationGroup::get, threadPool) {
 | 
	
		
			
				|  |  |              @Override
 | 
	
		
			
				|  |  |              public void perform(Request request, ActionListener<Result> listener) {
 | 
	
		
			
				|  |  |                  super.perform(request, ActionListener.map(listener, result -> {
 | 
	
	
		
			
				|  | @@ -336,13 +436,13 @@ public class ReplicationOperationTests extends ESTestCase {
 | 
	
		
			
				|  |  |          final Set<String> inSyncAllocationIds = state.metadata().index(index).inSyncAllocationIds(0);
 | 
	
		
			
				|  |  |          Set<String> trackedShards = new HashSet<>();
 | 
	
		
			
				|  |  |          addTrackingInfo(shardRoutingTable, null, trackedShards, new HashSet<>());
 | 
	
		
			
				|  |  | -        final ReplicationGroup initialReplicationGroup = new ReplicationGroup(shardRoutingTable, inSyncAllocationIds, trackedShards);
 | 
	
		
			
				|  |  | +        final ReplicationGroup initialReplicationGroup = new ReplicationGroup(shardRoutingTable, inSyncAllocationIds, trackedShards, 0);
 | 
	
		
			
				|  |  |  
 | 
	
		
			
				|  |  |          PlainActionFuture<TestPrimary.Result> listener = new PlainActionFuture<>();
 | 
	
		
			
				|  |  |          final ShardRouting primaryShard = shardRoutingTable.primaryShard();
 | 
	
		
			
				|  |  |          final TestReplicationOperation op = new TestReplicationOperation(request,
 | 
	
		
			
				|  |  | -            new TestPrimary(primaryShard, () -> initialReplicationGroup),
 | 
	
		
			
				|  |  | -                listener, new TestReplicaProxy(), logger, "test", primaryTerm);
 | 
	
		
			
				|  |  | +            new TestPrimary(primaryShard, () -> initialReplicationGroup, threadPool),
 | 
	
		
			
				|  |  | +                listener, new TestReplicaProxy(), logger, threadPool, "test", primaryTerm);
 | 
	
		
			
				|  |  |  
 | 
	
		
			
				|  |  |          if (passesActiveShardCheck) {
 | 
	
		
			
				|  |  |              assertThat(op.checkActiveShardCount(), nullValue());
 | 
	
	
		
			
				|  | @@ -372,12 +472,12 @@ public class ReplicationOperationTests extends ESTestCase {
 | 
	
		
			
				|  |  |          final Set<String> inSyncAllocationIds = indexMetadata.inSyncAllocationIds(0);
 | 
	
		
			
				|  |  |          final IndexShardRoutingTable shardRoutingTable = state.routingTable().index(index).shard(shardId.id());
 | 
	
		
			
				|  |  |          final Set<String> trackedShards = shardRoutingTable.getAllAllocationIds();
 | 
	
		
			
				|  |  | -        final ReplicationGroup initialReplicationGroup = new ReplicationGroup(shardRoutingTable, inSyncAllocationIds, trackedShards);
 | 
	
		
			
				|  |  | +        final ReplicationGroup initialReplicationGroup = new ReplicationGroup(shardRoutingTable, inSyncAllocationIds, trackedShards, 0);
 | 
	
		
			
				|  |  |  
 | 
	
		
			
				|  |  |          final boolean fatal = randomBoolean();
 | 
	
		
			
				|  |  |          final AtomicBoolean primaryFailed = new AtomicBoolean();
 | 
	
		
			
				|  |  |          final ReplicationOperation.Primary<Request, Request, TestPrimary.Result> primary =
 | 
	
		
			
				|  |  | -            new TestPrimary(primaryRouting, () -> initialReplicationGroup) {
 | 
	
		
			
				|  |  | +            new TestPrimary(primaryRouting, () -> initialReplicationGroup, threadPool) {
 | 
	
		
			
				|  |  |  
 | 
	
		
			
				|  |  |              @Override
 | 
	
		
			
				|  |  |              public void failShard(String message, Exception exception) {
 | 
	
	
		
			
				|  | @@ -460,15 +560,17 @@ public class ReplicationOperationTests extends ESTestCase {
 | 
	
		
			
				|  |  |          final long globalCheckpoint;
 | 
	
		
			
				|  |  |          final long maxSeqNoOfUpdatesOrDeletes;
 | 
	
		
			
				|  |  |          final Supplier<ReplicationGroup> replicationGroupSupplier;
 | 
	
		
			
				|  |  | +        final PendingReplicationActions pendingReplicationActions;
 | 
	
		
			
				|  |  |          final Map<String, Long> knownLocalCheckpoints = new HashMap<>();
 | 
	
		
			
				|  |  |          final Map<String, Long> knownGlobalCheckpoints = new HashMap<>();
 | 
	
		
			
				|  |  |  
 | 
	
		
			
				|  |  | -        TestPrimary(ShardRouting routing, Supplier<ReplicationGroup> replicationGroupSupplier) {
 | 
	
		
			
				|  |  | +        TestPrimary(ShardRouting routing, Supplier<ReplicationGroup> replicationGroupSupplier, ThreadPool threadPool) {
 | 
	
		
			
				|  |  |              this.routing = routing;
 | 
	
		
			
				|  |  |              this.replicationGroupSupplier = replicationGroupSupplier;
 | 
	
		
			
				|  |  |              this.localCheckpoint = random().nextLong();
 | 
	
		
			
				|  |  |              this.globalCheckpoint = randomNonNegativeLong();
 | 
	
		
			
				|  |  |              this.maxSeqNoOfUpdatesOrDeletes = randomNonNegativeLong();
 | 
	
		
			
				|  |  | +            this.pendingReplicationActions = new PendingReplicationActions(routing.shardId(), threadPool);
 | 
	
		
			
				|  |  |          }
 | 
	
		
			
				|  |  |  
 | 
	
		
			
				|  |  |          @Override
 | 
	
	
		
			
				|  | @@ -555,6 +657,11 @@ public class ReplicationOperationTests extends ESTestCase {
 | 
	
		
			
				|  |  |              return replicationGroupSupplier.get();
 | 
	
		
			
				|  |  |          }
 | 
	
		
			
				|  |  |  
 | 
	
		
			
				|  |  | +        @Override
 | 
	
		
			
				|  |  | +        public PendingReplicationActions getPendingReplicationActions() {
 | 
	
		
			
				|  |  | +            pendingReplicationActions.accept(getReplicationGroup());
 | 
	
		
			
				|  |  | +            return pendingReplicationActions;
 | 
	
		
			
				|  |  | +        }
 | 
	
		
			
				|  |  |      }
 | 
	
		
			
				|  |  |  
 | 
	
		
			
				|  |  |      static class ReplicaResponse implements ReplicationOperation.ReplicaResponse {
 | 
	
	
		
			
				|  | @@ -580,7 +687,10 @@ public class ReplicationOperationTests extends ESTestCase {
 | 
	
		
			
				|  |  |  
 | 
	
		
			
				|  |  |      static class TestReplicaProxy implements ReplicationOperation.Replicas<Request> {
 | 
	
		
			
				|  |  |  
 | 
	
		
			
				|  |  | +        private final int attemptsBeforeSuccess;
 | 
	
		
			
				|  |  | +        private final AtomicInteger attemptsNumber = new AtomicInteger(0);
 | 
	
		
			
				|  |  |          final Map<ShardRouting, Exception> opFailures;
 | 
	
		
			
				|  |  | +        private final boolean retryable;
 | 
	
		
			
				|  |  |  
 | 
	
		
			
				|  |  |          final Set<ShardRouting> failedReplicas = ConcurrentCollections.newConcurrentSet();
 | 
	
		
			
				|  |  |  
 | 
	
	
		
			
				|  | @@ -595,7 +705,17 @@ public class ReplicationOperationTests extends ESTestCase {
 | 
	
		
			
				|  |  |          }
 | 
	
		
			
				|  |  |  
 | 
	
		
			
				|  |  |          TestReplicaProxy(Map<ShardRouting, Exception> opFailures) {
 | 
	
		
			
				|  |  | +            this(opFailures, false);
 | 
	
		
			
				|  |  | +        }
 | 
	
		
			
				|  |  | +
 | 
	
		
			
				|  |  | +        TestReplicaProxy(Map<ShardRouting, Exception> opFailures, boolean retryable) {
 | 
	
		
			
				|  |  |              this.opFailures = opFailures;
 | 
	
		
			
				|  |  | +            this.retryable = retryable;
 | 
	
		
			
				|  |  | +            if (retryable) {
 | 
	
		
			
				|  |  | +                attemptsBeforeSuccess = randomInt(2) + 1;
 | 
	
		
			
				|  |  | +            } else {
 | 
	
		
			
				|  |  | +                attemptsBeforeSuccess = Integer.MAX_VALUE;
 | 
	
		
			
				|  |  | +            }
 | 
	
		
			
				|  |  |          }
 | 
	
		
			
				|  |  |  
 | 
	
		
			
				|  |  |          @Override
 | 
	
	
		
			
				|  | @@ -606,9 +726,19 @@ public class ReplicationOperationTests extends ESTestCase {
 | 
	
		
			
				|  |  |                  final long globalCheckpoint,
 | 
	
		
			
				|  |  |                  final long maxSeqNoOfUpdatesOrDeletes,
 | 
	
		
			
				|  |  |                  final ActionListener<ReplicationOperation.ReplicaResponse> listener) {
 | 
	
		
			
				|  |  | -            assertTrue("replica request processed twice on [" + replica + "]", request.processedOnReplicas.add(replica));
 | 
	
		
			
				|  |  | -            assertFalse("primary post replication actions should run after replication", request.runPostReplicationActionsOnPrimary.get());
 | 
	
		
			
				|  |  | -            if (opFailures.containsKey(replica)) {
 | 
	
		
			
				|  |  | +            boolean added = request.processedOnReplicas.add(replica);
 | 
	
		
			
				|  |  | +            if (retryable == false) {
 | 
	
		
			
				|  |  | +                assertTrue("replica request processed twice on [" + replica + "]", added);
 | 
	
		
			
				|  |  | +            }
 | 
	
		
			
				|  |  | +            // If replication is not retryable OR this is the first attempt, the post replication actions
 | 
	
		
			
				|  |  | +            // should not have run.
 | 
	
		
			
				|  |  | +            if (retryable == false || added) {
 | 
	
		
			
				|  |  | +                assertFalse("primary post replication actions should run after replication",
 | 
	
		
			
				|  |  | +                    request.runPostReplicationActionsOnPrimary.get());
 | 
	
		
			
				|  |  | +            }
 | 
	
		
			
				|  |  | +            // If this is a retryable scenario and this is the second try, we finish successfully
 | 
	
		
			
				|  |  | +            int n = attemptsNumber.incrementAndGet();
 | 
	
		
			
				|  |  | +            if (opFailures.containsKey(replica) && n <= attemptsBeforeSuccess) {
 | 
	
		
			
				|  |  |                  listener.onFailure(opFailures.get(replica));
 | 
	
		
			
				|  |  |              } else {
 | 
	
		
			
				|  |  |                  final long generatedLocalCheckpoint = random().nextLong();
 | 
	
	
		
			
				|  | @@ -643,15 +773,31 @@ public class ReplicationOperationTests extends ESTestCase {
 | 
	
		
			
				|  |  |      }
 | 
	
		
			
				|  |  |  
 | 
	
		
			
				|  |  |      class TestReplicationOperation extends ReplicationOperation<Request, Request, TestPrimary.Result> {
 | 
	
		
			
				|  |  | +
 | 
	
		
			
				|  |  | +        TestReplicationOperation(Request request, Primary<Request, Request, TestPrimary.Result> primary,
 | 
	
		
			
				|  |  | +                ActionListener<TestPrimary.Result> listener, Replicas<Request> replicas, long primaryTerm,
 | 
	
		
			
				|  |  | +                TimeValue initialRetryBackoffBound, TimeValue retryTimeout) {
 | 
	
		
			
				|  |  | +            this(request, primary, listener, replicas, ReplicationOperationTests.this.logger, threadPool, "test", primaryTerm,
 | 
	
		
			
				|  |  | +                initialRetryBackoffBound, retryTimeout);
 | 
	
		
			
				|  |  | +        }
 | 
	
		
			
				|  |  | +
 | 
	
		
			
				|  |  |          TestReplicationOperation(Request request, Primary<Request, Request, TestPrimary.Result> primary,
 | 
	
		
			
				|  |  | -                ActionListener<TestPrimary.Result> listener, Replicas<Request> replicas, long primaryTerm) {
 | 
	
		
			
				|  |  | -            this(request, primary, listener, replicas, ReplicationOperationTests.this.logger, "test", primaryTerm);
 | 
	
		
			
				|  |  | +                                 ActionListener<TestPrimary.Result> listener, Replicas<Request> replicas, long primaryTerm) {
 | 
	
		
			
				|  |  | +            this(request, primary, listener, replicas, ReplicationOperationTests.this.logger, threadPool, "test", primaryTerm);
 | 
	
		
			
				|  |  | +        }
 | 
	
		
			
				|  |  | +
 | 
	
		
			
				|  |  | +        TestReplicationOperation(Request request, Primary<Request, Request, TestPrimary.Result> primary,
 | 
	
		
			
				|  |  | +                                 ActionListener<TestPrimary.Result> listener,
 | 
	
		
			
				|  |  | +                                 Replicas<Request> replicas, Logger logger, ThreadPool threadPool, String opType, long primaryTerm) {
 | 
	
		
			
				|  |  | +            this(request, primary, listener, replicas, logger, threadPool, opType, primaryTerm, TimeValue.timeValueMillis(50),
 | 
	
		
			
				|  |  | +                TimeValue.timeValueSeconds(1));
 | 
	
		
			
				|  |  |          }
 | 
	
		
			
				|  |  |  
 | 
	
		
			
				|  |  |          TestReplicationOperation(Request request, Primary<Request, Request, TestPrimary.Result> primary,
 | 
	
		
			
				|  |  |                                   ActionListener<TestPrimary.Result> listener,
 | 
	
		
			
				|  |  | -                                 Replicas<Request> replicas, Logger logger, String opType, long primaryTerm) {
 | 
	
		
			
				|  |  | -            super(request, primary, listener, replicas, logger, opType, primaryTerm);
 | 
	
		
			
				|  |  | +                                 Replicas<Request> replicas, Logger logger, ThreadPool threadPool, String opType, long primaryTerm,
 | 
	
		
			
				|  |  | +                                 TimeValue initialRetryBackoffBound, TimeValue retryTimeout) {
 | 
	
		
			
				|  |  | +            super(request, primary, listener, replicas, logger, threadPool, opType, primaryTerm, initialRetryBackoffBound, retryTimeout);
 | 
	
		
			
				|  |  |          }
 | 
	
		
			
				|  |  |      }
 | 
	
		
			
				|  |  |  
 |