Browse Source

Move global checkpoint sync to write threadpool (#96364)

This commit moves the global checkpoint sync action to the write thread
pool. Additionally, it moves the sync pathway to the same pathway as the
location sync so that location syncs and global checkpoint syncs will
worksteal against each other instead of generating independent syncs.
Tim Brooks 2 years ago
parent
commit
0cf80f4f2d

+ 2 - 1
server/src/internalClusterTest/java/org/elasticsearch/action/RejectionActionIT.java

@@ -37,7 +37,8 @@ public class RejectionActionIT extends ESIntegTestCase {
             .put("thread_pool.search.size", 1)
             .put("thread_pool.search.queue_size", 1)
             .put("thread_pool.write.size", 1)
-            .put("thread_pool.write.queue_size", 1)
+            // Needs to be 2 since we have concurrent indexing and global checkpoint syncs
+            .put("thread_pool.write.queue_size", 2)
             .put("thread_pool.get.size", 1)
             .put("thread_pool.get.queue_size", 1)
             .build();

+ 4 - 1
server/src/internalClusterTest/java/org/elasticsearch/index/shard/IndexShardIT.java

@@ -160,7 +160,10 @@ public class IndexShardIT extends ESSingleNodeTestCase {
             Translog.Location lastWriteLocation = tlog.getLastWriteLocation();
             try {
                 // the lastWriteLocaltion has a Integer.MAX_VALUE size so we have to create a new one
-                return tlog.ensureSynced(new Translog.Location(lastWriteLocation.generation, lastWriteLocation.translogLocation, 0));
+                return tlog.ensureSynced(
+                    new Translog.Location(lastWriteLocation.generation, lastWriteLocation.translogLocation, 0),
+                    SequenceNumbers.UNASSIGNED_SEQ_NO
+                );
             } catch (IOException e) {
                 throw new UncheckedIOException(e);
             }

+ 5 - 0
server/src/main/java/org/elasticsearch/index/engine/Engine.java

@@ -758,6 +758,11 @@ public abstract class Engine implements Closeable {
      */
     public abstract void asyncEnsureTranslogSynced(Translog.Location location, Consumer<Exception> listener);
 
+    /**
+     * Ensures that the global checkpoint has been persisted to the underlying storage.
+     */
+    public abstract void asyncEnsureGlobalCheckpointSynced(long globalCheckpoint, Consumer<Exception> listener);
+
     public abstract void syncTranslog() throws IOException;
 
     /**

+ 21 - 5
server/src/main/java/org/elasticsearch/index/engine/InternalEngine.java

@@ -179,7 +179,7 @@ public class InternalEngine extends Engine {
     private final SoftDeletesPolicy softDeletesPolicy;
     private final LastRefreshedCheckpointListener lastRefreshedCheckpointListener;
     private final FlushListeners flushListener;
-    private final AsyncIOProcessor<Translog.Location> translogSyncProcessor;
+    private final AsyncIOProcessor<Tuple<Long, Translog.Location>> translogSyncProcessor;
 
     private final CompletionStatsCache completionStatsCache;
 
@@ -617,12 +617,23 @@ public class InternalEngine extends Engine {
         return getTranslog().syncNeeded();
     }
 
-    private AsyncIOProcessor<Translog.Location> createTranslogSyncProcessor(Logger logger, ThreadContext threadContext) {
+    private AsyncIOProcessor<Tuple<Long, Translog.Location>> createTranslogSyncProcessor(Logger logger, ThreadContext threadContext) {
         return new AsyncIOProcessor<>(logger, 1024, threadContext) {
             @Override
-            protected void write(List<Tuple<Translog.Location, Consumer<Exception>>> candidates) throws IOException {
+            protected void write(List<Tuple<Tuple<Long, Translog.Location>, Consumer<Exception>>> candidates) throws IOException {
                 try {
-                    final boolean synced = translog.ensureSynced(candidates.stream().map(Tuple::v1));
+                    Translog.Location location = Translog.Location.EMPTY;
+                    long processGlobalCheckpoint = SequenceNumbers.UNASSIGNED_SEQ_NO;
+                    for (Tuple<Tuple<Long, Translog.Location>, Consumer<Exception>> syncMarkers : candidates) {
+                        Tuple<Long, Translog.Location> marker = syncMarkers.v1();
+                        long globalCheckpointToSync = marker.v1();
+                        if (globalCheckpointToSync != SequenceNumbers.UNASSIGNED_SEQ_NO) {
+                            processGlobalCheckpoint = SequenceNumbers.max(processGlobalCheckpoint, globalCheckpointToSync);
+                        }
+                        location = location.compareTo(marker.v2()) >= 0 ? location : marker.v2();
+                    }
+
+                    final boolean synced = translog.ensureSynced(location, processGlobalCheckpoint);
                     if (synced) {
                         revisitIndexDeletionPolicyOnTranslogSynced();
                     }
@@ -639,7 +650,12 @@ public class InternalEngine extends Engine {
 
     @Override
     public void asyncEnsureTranslogSynced(Translog.Location location, Consumer<Exception> listener) {
-        translogSyncProcessor.put(location, listener);
+        translogSyncProcessor.put(new Tuple<>(SequenceNumbers.NO_OPS_PERFORMED, location), listener);
+    }
+
+    @Override
+    public void asyncEnsureGlobalCheckpointSynced(long globalCheckpoint, Consumer<Exception> listener) {
+        translogSyncProcessor.put(new Tuple<>(globalCheckpoint, Translog.Location.EMPTY), listener);
     }
 
     @Override

+ 5 - 0
server/src/main/java/org/elasticsearch/index/engine/ReadOnlyEngine.java

@@ -338,6 +338,11 @@ public class ReadOnlyEngine extends Engine {
         listener.accept(null);
     }
 
+    @Override
+    public void asyncEnsureGlobalCheckpointSynced(long globalCheckpoint, Consumer<Exception> listener) {
+        listener.accept(null);
+    }
+
     @Override
     public void syncTranslog() {}
 

+ 15 - 11
server/src/main/java/org/elasticsearch/index/seqno/GlobalCheckpointSyncAction.java

@@ -62,7 +62,9 @@ public class GlobalCheckpointSyncAction extends TransportReplicationAction<
             actionFilters,
             Request::new,
             Request::new,
-            ThreadPool.Names.MANAGEMENT
+            ThreadPool.Names.WRITE,
+            false,
+            true
         );
     }
 
@@ -77,24 +79,26 @@ public class GlobalCheckpointSyncAction extends TransportReplicationAction<
         IndexShard indexShard,
         ActionListener<PrimaryResult<Request, ReplicationResponse>> listener
     ) {
-        ActionListener.completeWith(listener, () -> {
-            maybeSyncTranslog(indexShard);
-            return new PrimaryResult<>(request, new ReplicationResponse());
-        });
+        maybeSyncTranslog(indexShard, listener.map(v -> new PrimaryResult<>(request, new ReplicationResponse())));
     }
 
     @Override
     protected void shardOperationOnReplica(Request shardRequest, IndexShard replica, ActionListener<ReplicaResult> listener) {
-        ActionListener.completeWith(listener, () -> {
-            maybeSyncTranslog(replica);
-            return new ReplicaResult();
-        });
+        maybeSyncTranslog(replica, listener.map(v -> new ReplicaResult()));
     }
 
-    private static void maybeSyncTranslog(final IndexShard indexShard) throws IOException {
+    private static <T> void maybeSyncTranslog(IndexShard indexShard, ActionListener<Void> listener) {
         if (indexShard.getTranslogDurability() == Translog.Durability.REQUEST
             && indexShard.getLastSyncedGlobalCheckpoint() < indexShard.getLastKnownGlobalCheckpoint()) {
-            indexShard.sync();
+            indexShard.syncGlobalCheckpoint(indexShard.getLastKnownGlobalCheckpoint(), e -> {
+                if (e == null) {
+                    listener.onResponse(null);
+                } else {
+                    listener.onFailure(e);
+                }
+            });
+        } else {
+            listener.onResponse(null);
         }
     }
 

+ 11 - 0
server/src/main/java/org/elasticsearch/index/shard/IndexShard.java

@@ -3615,6 +3615,17 @@ public class IndexShard extends AbstractIndexShardComponent implements IndicesCl
         getEngine().asyncEnsureTranslogSynced(location, syncListener);
     }
 
+    /**
+     * This method provides the same behavior as #sync but for persisting the global checkpoint. It will initiate a sync
+     * if the request global checkpoint is greater than the currently persisted global checkpoint. However, same as #sync it
+     * will not ensure that the request global checkpoint is available to be synced. It is the caller's duty to only call this
+     * method with a valid processed global checkpoint that is available to sync.
+     */
+    public void syncGlobalCheckpoint(long globalCheckpoint, Consumer<Exception> syncListener) {
+        verifyNotClosed();
+        getEngine().asyncEnsureGlobalCheckpointSynced(globalCheckpoint, syncListener);
+    }
+
     public void sync() throws IOException {
         verifyNotClosed();
         getEngine().syncTranslog();

+ 9 - 23
server/src/main/java/org/elasticsearch/index/translog/Translog.java

@@ -51,7 +51,6 @@ import java.util.Collections;
 import java.util.Iterator;
 import java.util.List;
 import java.util.Objects;
-import java.util.Optional;
 import java.util.OptionalLong;
 import java.util.concurrent.atomic.AtomicBoolean;
 import java.util.concurrent.locks.ReadWriteLock;
@@ -839,15 +838,18 @@ public class Translog extends AbstractIndexShardComponent implements IndexShardC
     }
 
     /**
-     * Ensures that the given location has be synced / written to the underlying storage.
+     * Ensures that the given location and global checkpoint has be synced / written to the underlying storage.
      *
      * @return Returns <code>true</code> iff this call caused an actual sync operation otherwise <code>false</code>
      */
-    public boolean ensureSynced(Location location) throws IOException {
+    public boolean ensureSynced(Location location, long globalCheckpoint) throws IOException {
         try (ReleasableLock lock = readLock.acquire()) {
-            if (location.generation == current.getGeneration()) { // if we have a new one it's already synced
+            // if we have a new generation and the persisted global checkpoint is greater than or equal to the sync global checkpoint it's
+            // already synced
+            long persistedGlobalCheckpoint = current.getLastSyncedCheckpoint().globalCheckpoint;
+            if (location.generation == current.getGeneration() || persistedGlobalCheckpoint < globalCheckpoint) {
                 ensureOpen();
-                return current.syncUpTo(location.translogLocation + location.size);
+                return current.syncUpTo(location.translogLocation + location.size, globalCheckpoint);
             }
         } catch (final Exception ex) {
             closeOnTragicEvent(ex);
@@ -856,24 +858,6 @@ public class Translog extends AbstractIndexShardComponent implements IndexShardC
         return false;
     }
 
-    /**
-     * Ensures that all locations in the given stream have been synced / written to the underlying storage.
-     * This method allows for internal optimization to minimize the amount of fsync operations if multiple
-     * locations must be synced.
-     *
-     * @return Returns <code>true</code> iff this call caused an actual sync operation otherwise <code>false</code>
-     */
-    public boolean ensureSynced(Stream<Location> locations) throws IOException {
-        final Optional<Location> max = locations.max(Location::compareTo);
-        // we only need to sync the max location since it will sync all other
-        // locations implicitly
-        if (max.isPresent()) {
-            return ensureSynced(max.get());
-        } else {
-            return false;
-        }
-    }
-
     /**
      * Closes the translog if the current translog writer experienced a tragic exception.
      *
@@ -929,6 +913,8 @@ public class Translog extends AbstractIndexShardComponent implements IndexShardC
 
     public static class Location implements Comparable<Location> {
 
+        public static Location EMPTY = new Location(0, 0, 0);
+
         public final long generation;
         public final long translogLocation;
         public final int size;

+ 11 - 4
server/src/main/java/org/elasticsearch/index/translog/TranslogWriter.java

@@ -346,7 +346,7 @@ public class TranslogWriter extends BaseTranslogReader implements Closeable {
      * raising the exception.
      */
     public void sync() throws IOException {
-        syncUpTo(Long.MAX_VALUE);
+        syncUpTo(Long.MAX_VALUE, SequenceNumbers.UNASSIGNED_SEQ_NO);
     }
 
     /**
@@ -462,10 +462,17 @@ public class TranslogWriter extends BaseTranslogReader implements Closeable {
      *
      * @return <code>true</code> if this call caused an actual sync operation
      */
-    final boolean syncUpTo(long offset) throws IOException {
-        if (lastSyncedCheckpoint.offset < offset && syncNeeded()) {
+    final boolean syncUpTo(long offset, long globalCheckpointToPersist) throws IOException {
+        if ((lastSyncedCheckpoint.offset < offset || lastSyncedCheckpoint.globalCheckpoint < globalCheckpointToPersist) && syncNeeded()) {
+            assert globalCheckpointToPersist <= globalCheckpointSupplier.getAsLong()
+                : "globalCheckpointToPersist ["
+                    + globalCheckpointToPersist
+                    + "] greater than global checkpoint ["
+                    + globalCheckpointSupplier.getAsLong()
+                    + "]";
             synchronized (syncLock) { // only one sync/checkpoint should happen concurrently but we wait
-                if (lastSyncedCheckpoint.offset < offset && syncNeeded()) {
+                if ((lastSyncedCheckpoint.offset < offset || lastSyncedCheckpoint.globalCheckpoint < globalCheckpointToPersist)
+                    && syncNeeded()) {
                     // double checked locking - we don't want to fsync unless we have to and now that we have
                     // the lock we should check again since if this code is busy we might have fsynced enough already
                     final Checkpoint checkpointToSync;

+ 13 - 2
server/src/test/java/org/elasticsearch/index/seqno/GlobalCheckpointSyncActionTests.java

@@ -27,8 +27,13 @@ import org.elasticsearch.threadpool.ThreadPool;
 import org.elasticsearch.transport.TransportService;
 
 import java.util.Collections;
+import java.util.function.Consumer;
 
 import static org.elasticsearch.test.ClusterServiceUtils.createClusterService;
+import static org.mockito.ArgumentMatchers.any;
+import static org.mockito.ArgumentMatchers.anyLong;
+import static org.mockito.ArgumentMatchers.eq;
+import static org.mockito.Mockito.doAnswer;
 import static org.mockito.Mockito.mock;
 import static org.mockito.Mockito.never;
 import static org.mockito.Mockito.verify;
@@ -100,6 +105,11 @@ public class GlobalCheckpointSyncActionTests extends ESTestCase {
 
         when(indexShard.getLastKnownGlobalCheckpoint()).thenReturn(globalCheckpoint);
         when(indexShard.getLastSyncedGlobalCheckpoint()).thenReturn(lastSyncedGlobalCheckpoint);
+        doAnswer(invocation -> {
+            Consumer<Exception> argument = invocation.getArgument(1);
+            argument.accept(null);
+            return null;
+        }).when(indexShard).syncGlobalCheckpoint(anyLong(), any());
 
         final GlobalCheckpointSyncAction action = new GlobalCheckpointSyncAction(
             Settings.EMPTY,
@@ -123,9 +133,10 @@ public class GlobalCheckpointSyncActionTests extends ESTestCase {
 
         if (durability == Translog.Durability.ASYNC || lastSyncedGlobalCheckpoint == globalCheckpoint) {
             verify(indexShard, never()).sync();
+            verify(indexShard, never()).syncGlobalCheckpoint(anyLong(), any());
         } else {
-            verify(indexShard).sync();
+            verify(indexShard, never()).sync();
+            verify(indexShard).syncGlobalCheckpoint(eq(globalCheckpoint), any());
         }
     }
-
 }

+ 63 - 0
server/src/test/java/org/elasticsearch/index/shard/IndexShardTests.java

@@ -1534,6 +1534,69 @@ public class IndexShardTests extends IndexShardTestCase {
         closeShards(shard);
     }
 
+    public void testAsyncPersistGlobalCheckpointSync() throws InterruptedException, IOException {
+        final ShardId shardId = new ShardId("index", "_na_", 0);
+        final ShardRouting shardRouting = TestShardRouting.newShardRouting(
+            shardId,
+            randomAlphaOfLength(8),
+            true,
+            ShardRoutingState.INITIALIZING,
+            RecoverySource.EmptyStoreRecoverySource.INSTANCE
+        );
+        final Settings settings = indexSettings(Version.CURRENT, 1, 2).build();
+        final IndexMetadata.Builder indexMetadata = IndexMetadata.builder(shardRouting.getIndexName()).settings(settings).primaryTerm(0, 1);
+        IndexShard shard = newShard(
+            shardRouting,
+            indexMetadata.build(),
+            null,
+            new InternalEngineFactory(),
+            ignoredShardId -> {},
+            RetentionLeaseSyncer.EMPTY
+        );
+        recoverShardFromStore(shard);
+
+        final int maxSeqNo = randomIntBetween(0, 128);
+        for (int i = 0; i <= maxSeqNo; i++) {
+            EngineTestCase.generateNewSeqNo(shard.getEngine());
+        }
+        final long checkpoint = rarely() ? maxSeqNo - scaledRandomIntBetween(0, maxSeqNo) : maxSeqNo;
+        shard.updateLocalCheckpointForShard(shardRouting.allocationId().getId(), checkpoint);
+        shard.updateGlobalCheckpointForShard(shard.routingEntry().allocationId().getId(), shard.getLocalCheckpoint());
+
+        Semaphore semaphore = new Semaphore(Integer.MAX_VALUE);
+        Thread[] thread = new Thread[randomIntBetween(3, 5)];
+        CountDownLatch latch = new CountDownLatch(thread.length);
+        for (int i = 0; i < thread.length; i++) {
+            thread[i] = new Thread() {
+                @Override
+                public void run() {
+                    try {
+                        latch.countDown();
+                        latch.await();
+                        for (int i = 0; i < 10000; i++) {
+                            semaphore.acquire();
+                            shard.syncGlobalCheckpoint(
+                                randomLongBetween(0, shard.getLastKnownGlobalCheckpoint()),
+                                (ex) -> semaphore.release()
+                            );
+                        }
+                    } catch (Exception ex) {
+                        throw new RuntimeException(ex);
+                    }
+                }
+            };
+            thread[i].start();
+        }
+
+        for (int i = 0; i < thread.length; i++) {
+            thread[i].join();
+        }
+        assertTrue(semaphore.tryAcquire(Integer.MAX_VALUE, 10, TimeUnit.SECONDS));
+        assertEquals(shard.getLastKnownGlobalCheckpoint(), shard.getLastSyncedGlobalCheckpoint());
+
+        closeShards(shard);
+    }
+
     public void testShardStats() throws IOException {
 
         IndexShard shard = newStartedShard();

+ 52 - 43
server/src/test/java/org/elasticsearch/index/translog/TranslogTests.java

@@ -1024,7 +1024,7 @@ public class TranslogTests extends ESTestCase {
                             fail("duplicate op [" + op + "], old entry at " + location);
                         }
                         if (id % writers.length == threadId) {
-                            translog.ensureSynced(location);
+                            translog.ensureSynced(location, SequenceNumbers.UNASSIGNED_SEQ_NO);
                         }
                         if (id % flushEveryOps == 0) {
                             synchronized (flushMutex) {
@@ -1162,67 +1162,72 @@ public class TranslogTests extends ESTestCase {
         logger.info("--> test done. total ops written [{}]", writtenOps.size());
     }
 
-    public void testSyncUpTo() throws IOException {
-        int translogOperations = randomIntBetween(10, 100);
-        int count = 0;
-        for (int op = 0; op < translogOperations; op++) {
-            int seqNo = ++count;
-            final Translog.Location location = translog.add(TranslogOperationsUtils.indexOp("" + op, seqNo, primaryTerm.get()));
-            if (randomBoolean()) {
-                assertTrue("at least one operation pending", translog.syncNeeded());
-                assertTrue("this operation has not been synced", translog.ensureSynced(location));
-                // we are the last location so everything should be synced
-                assertFalse("the last call to ensureSycned synced all previous ops", translog.syncNeeded());
-                seqNo = ++count;
-                translog.add(TranslogOperationsUtils.indexOp("" + op, seqNo, primaryTerm.get()));
-                assertTrue("one pending operation", translog.syncNeeded());
-                assertFalse("this op has been synced before", translog.ensureSynced(location)); // not syncing now
-                assertTrue("we only synced a previous operation yet", translog.syncNeeded());
-            }
-            if (rarely()) {
-                translog.rollGeneration();
-                assertFalse("location is from a previous translog - already synced", translog.ensureSynced(location)); // not syncing now
-                assertFalse("no sync needed since no operations in current translog", translog.syncNeeded());
-            }
-
-            if (randomBoolean()) {
-                translog.sync();
-                assertFalse("translog has been synced already", translog.ensureSynced(location));
-            }
-        }
-    }
+    public void testSyncUpToLocationAndCheckpoint() throws IOException {
+        assertFalse(
+            "translog empty location and not ops performed will not require sync",
+            translog.ensureSynced(Location.EMPTY, SequenceNumbers.UNASSIGNED_SEQ_NO)
+        );
 
-    public void testSyncUpToStream() throws IOException {
-        int iters = randomIntBetween(5, 10);
+        int iters = randomIntBetween(25, 50);
+        Location alreadySynced = Location.EMPTY;
+        long alreadySyncedCheckpoint = SequenceNumbers.UNASSIGNED_SEQ_NO;
         for (int i = 0; i < iters; i++) {
             int translogOperations = randomIntBetween(10, 100);
             int count = 0;
-            ArrayList<Location> locations = new ArrayList<>();
+
+            Location location = null;
+            final ArrayList<Location> locations = new ArrayList<>();
+            final ArrayList<Location> locationsInCurrentGeneration = new ArrayList<>();
             for (int op = 0; op < translogOperations; op++) {
                 if (rarely()) {
                     translog.rollGeneration();
+                    locationsInCurrentGeneration.clear();
                 }
-                final Translog.Location location = translog.add(indexOp("" + op, op, primaryTerm.get(), Integer.toString(++count)));
+                location = translog.add(indexOp("" + op, op, primaryTerm.get(), Integer.toString(++count)));
+                globalCheckpoint.incrementAndGet();
                 locations.add(location);
+                locationsInCurrentGeneration.add(location);
             }
-            Collections.shuffle(locations, random());
+
+            assertFalse("should have been synced on previous iteration", translog.ensureSynced(alreadySynced, alreadySyncedCheckpoint));
+
             if (randomBoolean()) {
                 assertTrue("at least one operation pending", translog.syncNeeded());
-                assertTrue("this operation has not been synced", translog.ensureSynced(locations.stream()));
-                // we are the last location so everything should be synced
+                if (randomBoolean()) {
+                    Location randomLocationToSync = locationsInCurrentGeneration.get(randomInt(locationsInCurrentGeneration.size() - 1));
+                    assertTrue(
+                        "this operation has not been synced",
+                        translog.ensureSynced(randomLocationToSync, SequenceNumbers.UNASSIGNED_SEQ_NO)
+                    );
+                } else {
+                    long globalCheckpointToSync = randomLongBetween(translog.getLastSyncedGlobalCheckpoint() + 1, globalCheckpoint.get());
+                    assertTrue(
+                        "this global checkpoint has not been persisted",
+                        translog.ensureSynced(Location.EMPTY, globalCheckpointToSync)
+                    );
+                }
+                // everything should be synced
                 assertFalse("the last call to ensureSycned synced all previous ops", translog.syncNeeded());
             } else if (rarely()) {
                 translog.rollGeneration();
                 // not syncing now
-                assertFalse("location is from a previous translog - already synced", translog.ensureSynced(locations.stream()));
+                assertFalse(
+                    "location is from a previous translog - already synced",
+                    translog.ensureSynced(location, globalCheckpoint.get())
+                );
                 assertFalse("no sync needed since no operations in current translog", translog.syncNeeded());
             } else {
                 translog.sync();
-                assertFalse("translog has been synced already", translog.ensureSynced(locations.stream()));
+                assertFalse("translog has been synced already", translog.ensureSynced(location, globalCheckpoint.get()));
             }
-            for (Location location : locations) {
-                assertFalse("all of the locations should be synced: " + location, translog.ensureSynced(location));
+
+            Collections.shuffle(locations, random());
+            for (Location l : locations) {
+                assertFalse("all of the locations should be synced: " + l, translog.ensureSynced(l, SequenceNumbers.UNASSIGNED_SEQ_NO));
             }
+
+            alreadySynced = location;
+            alreadySyncedCheckpoint = globalCheckpoint.get();
         }
     }
 
@@ -2550,7 +2555,7 @@ public class TranslogTests extends ESTestCase {
         try {
             Translog.Location location = translog.add(indexOp("2", 1, primaryTerm.get(), lineFileDocs.nextDoc().toString()));
             if (randomBoolean()) {
-                translog.ensureSynced(location);
+                translog.ensureSynced(location, SequenceNumbers.UNASSIGNED_SEQ_NO);
             } else {
                 translog.sync();
             }
@@ -3888,7 +3893,11 @@ public class TranslogTests extends ESTestCase {
                             long globalCheckpoint = lastGlobalCheckpoint.get();
                             final boolean synced;
                             if (randomBoolean()) {
-                                synced = translog.ensureSynced(location);
+                                if (randomBoolean()) {
+                                    synced = translog.ensureSynced(location, globalCheckpoint);
+                                } else {
+                                    synced = translog.ensureSynced(location, SequenceNumbers.UNASSIGNED_SEQ_NO);
+                                }
                             } else {
                                 translog.sync();
                                 synced = true;