Browse Source

Update global checkpoint when increasing primary term on replica (#25422)

When a replica shard increases its primary term under the mandate of a new primary, it should also update its global checkpoint; this gives us the guarantee that its global checkpoint is at least as high as the new primary and gives a starting point for the primary/replica resync.

Relates to #25355, #10708
Yannick Welsch 8 years ago
parent
commit
8ae61c0fc4

+ 2 - 3
core/src/main/java/org/elasticsearch/action/support/replication/TransportReplicationAction.java

@@ -183,7 +183,7 @@ public abstract class TransportReplicationAction<
 
     /**
      * Synchronously execute the specified replica operation. This is done under a permit from
-     * {@link IndexShard#acquireReplicaOperationPermit(long, ActionListener, String)}.
+     * {@link IndexShard#acquireReplicaOperationPermit(long, long, ActionListener, String)}.
      *
      * @param shardRequest the request to the replica shard
      * @param replica      the replica shard to perform the operation on
@@ -521,7 +521,6 @@ public abstract class TransportReplicationAction<
         @Override
         public void onResponse(Releasable releasable) {
             try {
-                replica.updateGlobalCheckpointOnReplica(globalCheckpoint);
                 final ReplicaResult replicaResult = shardOperationOnReplica(request, replica);
                 releasable.close(); // release shard operation lock before responding to caller
                 final TransportReplicationAction.ReplicaResponse response =
@@ -596,7 +595,7 @@ public abstract class TransportReplicationAction<
                 throw new ShardNotFoundException(this.replica.shardId(), "expected aID [{}] but found [{}]", targetAllocationID,
                     actualAllocationId);
             }
-            replica.acquireReplicaOperationPermit(request.primaryTerm, this, executor);
+            replica.acquireReplicaOperationPermit(request.primaryTerm, globalCheckpoint, this, executor);
         }
 
         /**

+ 29 - 2
core/src/main/java/org/elasticsearch/index/shard/IndexShard.java

@@ -2031,29 +2031,47 @@ public class IndexShard extends AbstractIndexShardComponent implements IndicesCl
      * name.
      *
      * @param operationPrimaryTerm the operation primary term
+     * @param globalCheckpoint     the global checkpoint associated with the request
      * @param onPermitAcquired     the listener for permit acquisition
      * @param executorOnDelay      the name of the executor to invoke the listener on if permit acquisition is delayed
      */
-    public void acquireReplicaOperationPermit(
-            final long operationPrimaryTerm, final ActionListener<Releasable> onPermitAcquired, final String executorOnDelay) {
+    public void acquireReplicaOperationPermit(final long operationPrimaryTerm, final long globalCheckpoint,
+                                              final ActionListener<Releasable> onPermitAcquired, final String executorOnDelay) {
         verifyNotClosed();
         verifyReplicationTarget();
+        final boolean globalCheckpointUpdated;
         if (operationPrimaryTerm > primaryTerm) {
             synchronized (primaryTermMutex) {
                 if (operationPrimaryTerm > primaryTerm) {
+                    IndexShardState shardState = state();
+                    // only roll translog and update primary term if shard has made it past recovery
+                    // Having a new primary term here means that the old primary failed and that there is a new primary, which again
+                    // means that the master will fail this shard as all initializing shards are failed when a primary is selected
+                    // We abort early here to prevent an ongoing recovery from the failed primary to mess with the global / local checkpoint
+                    if (shardState != IndexShardState.POST_RECOVERY &&
+                        shardState != IndexShardState.STARTED &&
+                        shardState != IndexShardState.RELOCATED) {
+                        throw new IndexShardNotStartedException(shardId, shardState);
+                    }
                     try {
                         indexShardOperationPermits.blockOperations(30, TimeUnit.MINUTES, () -> {
                             assert operationPrimaryTerm > primaryTerm :
                                 "shard term already update.  op term [" + operationPrimaryTerm + "], shardTerm [" + primaryTerm + "]";
                             primaryTerm = operationPrimaryTerm;
+                            updateGlobalCheckpointOnReplica(globalCheckpoint);
                             getEngine().getTranslog().rollGeneration();
                         });
+                        globalCheckpointUpdated = true;
                     } catch (final Exception e) {
                         onPermitAcquired.onFailure(e);
                         return;
                     }
+                } else {
+                    globalCheckpointUpdated = false;
                 }
             }
+        } else {
+            globalCheckpointUpdated = false;
         }
 
         assert operationPrimaryTerm <= primaryTerm
@@ -2072,6 +2090,15 @@ public class IndexShard extends AbstractIndexShardComponent implements IndicesCl
                                     primaryTerm);
                             onPermitAcquired.onFailure(new IllegalStateException(message));
                         } else {
+                            if (globalCheckpointUpdated == false) {
+                                try {
+                                    updateGlobalCheckpointOnReplica(globalCheckpoint);
+                                } catch (Exception e) {
+                                    releasable.close();
+                                    onPermitAcquired.onFailure(e);
+                                    return;
+                                }
+                            }
                             onPermitAcquired.onResponse(releasable);
                         }
                     }

+ 2 - 2
core/src/test/java/org/elasticsearch/action/support/replication/TransportReplicationActionTests.java

@@ -1161,7 +1161,7 @@ public class TransportReplicationActionTests extends ESTestCase {
         }).when(indexShard).acquirePrimaryOperationPermit(any(ActionListener.class), anyString());
         doAnswer(invocation -> {
             long term = (Long)invocation.getArguments()[0];
-            ActionListener<Releasable> callback = (ActionListener<Releasable>) invocation.getArguments()[1];
+            ActionListener<Releasable> callback = (ActionListener<Releasable>) invocation.getArguments()[2];
             final long primaryTerm = indexShard.getPrimaryTerm();
             if (term < primaryTerm) {
                 throw new IllegalArgumentException(String.format(Locale.ROOT, "%s operation term [%d] is too old (current [%d])",
@@ -1170,7 +1170,7 @@ public class TransportReplicationActionTests extends ESTestCase {
             count.incrementAndGet();
             callback.onResponse(count::decrementAndGet);
             return null;
-        }).when(indexShard).acquireReplicaOperationPermit(anyLong(), any(ActionListener.class), anyString());
+        }).when(indexShard).acquireReplicaOperationPermit(anyLong(), anyLong(), any(ActionListener.class), anyString());
         when(indexShard.routingEntry()).thenAnswer(invocationOnMock -> {
             final ClusterState state = clusterService.state();
             final RoutingNode node = state.getRoutingNodes().node(state.nodes().getLocalNodeId());

+ 1 - 1
core/src/test/java/org/elasticsearch/action/support/replication/TransportWriteActionTests.java

@@ -456,7 +456,7 @@ public class TransportWriteActionTests extends ESTestCase {
             count.incrementAndGet();
             callback.onResponse(count::decrementAndGet);
             return null;
-        }).when(indexShard).acquireReplicaOperationPermit(anyLong(), any(ActionListener.class), anyString());
+        }).when(indexShard).acquireReplicaOperationPermit(anyLong(), anyLong(), any(ActionListener.class), anyString());
         when(indexShard.routingEntry()).thenAnswer(invocationOnMock -> {
             final ClusterState state = clusterService.state();
             final RoutingNode node = state.getRoutingNodes().node(state.nodes().getLocalNodeId());

+ 1 - 1
core/src/test/java/org/elasticsearch/index/replication/ESIndexLevelReplicationTestCase.java

@@ -518,11 +518,11 @@ public abstract class ESIndexLevelReplicationTestCase extends IndexShardTestCase
                         .filter(s -> replicaRouting.isSameAllocation(s.routingEntry())).findFirst().get();
                 replica.acquireReplicaOperationPermit(
                         request.primaryTerm(),
+                        globalCheckpoint,
                         new ActionListener<Releasable>() {
                             @Override
                             public void onResponse(Releasable releasable) {
                                 try {
-                                    replica.updateGlobalCheckpointOnReplica(globalCheckpoint);
                                     performOnReplica(request, replica);
                                     releasable.close();
                                     listener.onResponse(

+ 100 - 49
core/src/test/java/org/elasticsearch/index/shard/IndexShardTests.java

@@ -275,13 +275,22 @@ public class IndexShardTests extends IndexShardTestCase {
             // expected
         }
         try {
-            indexShard.acquireReplicaOperationPermit(indexShard.getPrimaryTerm(), null, ThreadPool.Names.INDEX);
+            indexShard.acquireReplicaOperationPermit(indexShard.getPrimaryTerm(), SequenceNumbersService.UNASSIGNED_SEQ_NO, null,
+                ThreadPool.Names.INDEX);
             fail("we should not be able to increment anymore");
         } catch (IndexShardClosedException e) {
             // expected
         }
     }
 
+    public void testRejectOperationPermitWithHigherTermWhenNotStarted() throws IOException {
+        IndexShard indexShard = newShard(false);
+        expectThrows(IndexShardNotStartedException.class, () ->
+            indexShard.acquireReplicaOperationPermit(indexShard.getPrimaryTerm() + randomIntBetween(1, 100),
+                SequenceNumbersService.UNASSIGNED_SEQ_NO, null, ThreadPool.Names.INDEX));
+        closeShards(indexShard);
+    }
+
     public void testPrimaryPromotionDelaysOperations() throws IOException, BrokenBarrierException, InterruptedException {
         final IndexShard indexShard = newStartedShard(false);
 
@@ -299,6 +308,7 @@ public class IndexShardTests extends IndexShardTestCase {
                 }
                 indexShard.acquireReplicaOperationPermit(
                         indexShard.getPrimaryTerm(),
+                        indexShard.getGlobalCheckpoint(),
                         new ActionListener<Releasable>() {
                             @Override
                             public void onResponse(Releasable releasable) {
@@ -477,7 +487,7 @@ public class IndexShardTests extends IndexShardTestCase {
         assertEquals(0, indexShard.getActiveOperationsCount());
         if (indexShard.routingEntry().isRelocationTarget() == false) {
             try {
-                indexShard.acquireReplicaOperationPermit(primaryTerm, null, ThreadPool.Names.INDEX);
+                indexShard.acquireReplicaOperationPermit(primaryTerm, indexShard.getGlobalCheckpoint(), null, ThreadPool.Names.INDEX);
                 fail("shard shouldn't accept operations as replica");
             } catch (IllegalStateException ignored) {
 
@@ -503,11 +513,11 @@ public class IndexShardTests extends IndexShardTestCase {
     private Releasable acquireReplicaOperationPermitBlockingly(IndexShard indexShard, long opPrimaryTerm)
         throws ExecutionException, InterruptedException {
         PlainActionFuture<Releasable> fut = new PlainActionFuture<>();
-        indexShard.acquireReplicaOperationPermit(opPrimaryTerm, fut, ThreadPool.Names.INDEX);
+        indexShard.acquireReplicaOperationPermit(opPrimaryTerm, indexShard.getGlobalCheckpoint(), fut, ThreadPool.Names.INDEX);
         return fut.get();
     }
 
-    public void testOperationPermitOnReplicaShards() throws InterruptedException, ExecutionException, IOException, BrokenBarrierException {
+    public void testOperationPermitOnReplicaShards() throws Exception {
         final ShardId shardId = new ShardId("test", "_na_", 0);
         final IndexShard indexShard;
         final boolean engineClosed;
@@ -557,10 +567,17 @@ public class IndexShardTests extends IndexShardTestCase {
         final long primaryTerm = indexShard.getPrimaryTerm();
         final long translogGen = engineClosed ? -1 : indexShard.getTranslog().getGeneration().translogFileGeneration;
 
-        final Releasable operation1 = acquireReplicaOperationPermitBlockingly(indexShard, primaryTerm);
-        assertEquals(1, indexShard.getActiveOperationsCount());
-        final Releasable operation2 = acquireReplicaOperationPermitBlockingly(indexShard, primaryTerm);
-        assertEquals(2, indexShard.getActiveOperationsCount());
+        final Releasable operation1;
+        final Releasable operation2;
+        if (engineClosed == false) {
+            operation1 = acquireReplicaOperationPermitBlockingly(indexShard, primaryTerm);
+            assertEquals(1, indexShard.getActiveOperationsCount());
+            operation2 = acquireReplicaOperationPermitBlockingly(indexShard, primaryTerm);
+            assertEquals(2, indexShard.getActiveOperationsCount());
+        } else {
+            operation1 = null;
+            operation2 = null;
+        }
 
         {
             final AtomicBoolean onResponse = new AtomicBoolean();
@@ -579,7 +596,8 @@ public class IndexShardTests extends IndexShardTestCase {
                 }
             };
 
-            indexShard.acquireReplicaOperationPermit(primaryTerm - 1, onLockAcquired, ThreadPool.Names.INDEX);
+            indexShard.acquireReplicaOperationPermit(primaryTerm - 1, SequenceNumbersService.UNASSIGNED_SEQ_NO, onLockAcquired,
+                ThreadPool.Names.INDEX);
 
             assertFalse(onResponse.get());
             assertTrue(onFailure.get());
@@ -593,6 +611,21 @@ public class IndexShardTests extends IndexShardTestCase {
             final AtomicReference<Exception> onFailure = new AtomicReference<>();
             final CyclicBarrier barrier = new CyclicBarrier(2);
             final long newPrimaryTerm = primaryTerm + 1 + randomInt(20);
+            if (engineClosed == false) {
+                assertThat(indexShard.getLocalCheckpoint(), equalTo(SequenceNumbersService.NO_OPS_PERFORMED));
+                assertThat(indexShard.getGlobalCheckpoint(), equalTo(SequenceNumbersService.UNASSIGNED_SEQ_NO));
+            }
+            final long newGlobalCheckPoint;
+            if (engineClosed || randomBoolean()) {
+                newGlobalCheckPoint = SequenceNumbersService.UNASSIGNED_SEQ_NO;
+            } else {
+                long localCheckPoint = indexShard.getGlobalCheckpoint() + randomInt(100);
+                // advance local checkpoint
+                for (int i = 0; i <= localCheckPoint; i++) {
+                    indexShard.markSeqNoAsNoop(i, indexShard.getPrimaryTerm(), "dummy doc");
+                }
+                newGlobalCheckPoint = randomIntBetween((int) indexShard.getGlobalCheckpoint(), (int) localCheckPoint);
+            }
             // but you can not increment with a new primary term until the operations on the older primary term complete
             final Thread thread = new Thread(() -> {
                 try {
@@ -600,55 +633,72 @@ public class IndexShardTests extends IndexShardTestCase {
                 } catch (final BrokenBarrierException | InterruptedException e) {
                     throw new RuntimeException(e);
                 }
-                indexShard.acquireReplicaOperationPermit(
-                    newPrimaryTerm,
-                        new ActionListener<Releasable>() {
-                            @Override
-                            public void onResponse(Releasable releasable) {
-                                assertThat(indexShard.getPrimaryTerm(), equalTo(newPrimaryTerm));
-                                onResponse.set(true);
-                                releasable.close();
-                                finish();
-                            }
+                ActionListener<Releasable> listener = new ActionListener<Releasable>() {
+                    @Override
+                    public void onResponse(Releasable releasable) {
+                        assertThat(indexShard.getPrimaryTerm(), equalTo(newPrimaryTerm));
+                        assertThat(indexShard.getGlobalCheckpoint(), equalTo(newGlobalCheckPoint));
+                        onResponse.set(true);
+                        releasable.close();
+                        finish();
+                    }
 
-                            @Override
-                            public void onFailure(Exception e) {
-                                onFailure.set(e);
-                                finish();
-                            }
+                    @Override
+                    public void onFailure(Exception e) {
+                        onFailure.set(e);
+                        finish();
+                    }
 
-                            private void finish() {
-                                try {
-                                    barrier.await();
-                                } catch (final BrokenBarrierException | InterruptedException e) {
-                                    throw new RuntimeException(e);
-                                }
-                            }
-                        },
+                    private void finish() {
+                        try {
+                            barrier.await();
+                        } catch (final BrokenBarrierException | InterruptedException e) {
+                            throw new RuntimeException(e);
+                        }
+                    }
+                };
+                try {
+                    indexShard.acquireReplicaOperationPermit(
+                        newPrimaryTerm,
+                        newGlobalCheckPoint,
+                        listener,
                         ThreadPool.Names.SAME);
+                } catch (Exception e) {
+                    listener.onFailure(e);
+                }
             });
             thread.start();
             barrier.await();
-            // our operation should be blocked until the previous operations complete
-            assertFalse(onResponse.get());
-            assertNull(onFailure.get());
-            assertThat(indexShard.getPrimaryTerm(), equalTo(primaryTerm));
-            Releasables.close(operation1);
-            // our operation should still be blocked
-            assertFalse(onResponse.get());
-            assertNull(onFailure.get());
-            assertThat(indexShard.getPrimaryTerm(), equalTo(primaryTerm));
-            Releasables.close(operation2);
-            barrier.await();
-            // now lock acquisition should have succeeded
-            assertThat(indexShard.getPrimaryTerm(), equalTo(newPrimaryTerm));
-            if (engineClosed) {
+            if (indexShard.state() == IndexShardState.CREATED || indexShard.state() == IndexShardState.RECOVERING) {
+                barrier.await();
+                assertThat(indexShard.getPrimaryTerm(), equalTo(primaryTerm));
                 assertFalse(onResponse.get());
-                assertThat(onFailure.get(), instanceOf(AlreadyClosedException.class));
+                assertThat(onFailure.get(), instanceOf(IndexShardNotStartedException.class));
+                Releasables.close(operation1);
+                Releasables.close(operation2);
             } else {
-                assertTrue(onResponse.get());
+                // our operation should be blocked until the previous operations complete
+                assertFalse(onResponse.get());
                 assertNull(onFailure.get());
-                assertThat(indexShard.getTranslog().getGeneration().translogFileGeneration, equalTo(translogGen + 1));
+                assertThat(indexShard.getPrimaryTerm(), equalTo(primaryTerm));
+                Releasables.close(operation1);
+                // our operation should still be blocked
+                assertFalse(onResponse.get());
+                assertNull(onFailure.get());
+                assertThat(indexShard.getPrimaryTerm(), equalTo(primaryTerm));
+                Releasables.close(operation2);
+                barrier.await();
+                // now lock acquisition should have succeeded
+                assertThat(indexShard.getPrimaryTerm(), equalTo(newPrimaryTerm));
+                if (engineClosed) {
+                    assertFalse(onResponse.get());
+                    assertThat(onFailure.get(), instanceOf(AlreadyClosedException.class));
+                } else {
+                    assertTrue(onResponse.get());
+                    assertNull(onFailure.get());
+                    assertThat(indexShard.getTranslog().getGeneration().translogFileGeneration, equalTo(translogGen + 1));
+                    assertThat(indexShard.getGlobalCheckpoint(), equalTo(newGlobalCheckPoint));
+                }
             }
             thread.join();
             assertEquals(0, indexShard.getActiveOperationsCount());
@@ -676,6 +726,7 @@ public class IndexShardTests extends IndexShardTestCase {
             }
             indexShard.acquireReplicaOperationPermit(
                     primaryTerm + increment,
+                    indexShard.getGlobalCheckpoint(),
                     new ActionListener<Releasable>() {
                         @Override
                         public void onResponse(Releasable releasable) {