Browse Source

Merge pull request ESQL-1201 from elastic/main

🤖 ESQL: Merge upstream
elasticsearchmachine 2 years ago
parent
commit
32dd9edf6c

+ 2 - 1
server/src/internalClusterTest/java/org/elasticsearch/action/RejectionActionIT.java

@@ -37,7 +37,8 @@ public class RejectionActionIT extends ESIntegTestCase {
             .put("thread_pool.search.size", 1)
             .put("thread_pool.search.queue_size", 1)
             .put("thread_pool.write.size", 1)
-            .put("thread_pool.write.queue_size", 1)
+            // Needs to be 2 since we have concurrent indexing and global checkpoint syncs
+            .put("thread_pool.write.queue_size", 2)
             .put("thread_pool.get.size", 1)
             .put("thread_pool.get.queue_size", 1)
             .build();

+ 4 - 1
server/src/internalClusterTest/java/org/elasticsearch/index/shard/IndexShardIT.java

@@ -160,7 +160,10 @@ public class IndexShardIT extends ESSingleNodeTestCase {
             Translog.Location lastWriteLocation = tlog.getLastWriteLocation();
             try {
                 // the lastWriteLocaltion has a Integer.MAX_VALUE size so we have to create a new one
-                return tlog.ensureSynced(new Translog.Location(lastWriteLocation.generation, lastWriteLocation.translogLocation, 0));
+                return tlog.ensureSynced(
+                    new Translog.Location(lastWriteLocation.generation, lastWriteLocation.translogLocation, 0),
+                    SequenceNumbers.UNASSIGNED_SEQ_NO
+                );
             } catch (IOException e) {
                 throw new UncheckedIOException(e);
             }

+ 3 - 19
server/src/main/java/org/elasticsearch/common/util/concurrent/KeyedLock.java

@@ -26,22 +26,6 @@ import java.util.concurrent.locks.ReentrantLock;
 public final class KeyedLock<T> {
 
     private final ConcurrentMap<T, KeyLock> map = ConcurrentCollections.newConcurrentMapWithAggressiveConcurrency();
-    private final boolean fair;
-
-    /**
-     * Creates a new lock
-     * @param fair Use fair locking, ie threads get the lock in the order they requested it
-     */
-    public KeyedLock(boolean fair) {
-        this.fair = fair;
-    }
-
-    /**
-     * Creates a non-fair lock
-     */
-    public KeyedLock() {
-        this(false);
-    }
 
     /**
      * Acquires a lock for the given key. The key is compared by it's equals method not by object identity. The lock can be acquired
@@ -90,7 +74,7 @@ public final class KeyedLock<T> {
     }
 
     private ReleasableLock tryCreateNewLock(T key) {
-        KeyLock newLock = new KeyLock(fair);
+        KeyLock newLock = new KeyLock();
         newLock.lock();
         KeyLock keyLock = map.putIfAbsent(key, newLock);
         if (keyLock == null) {
@@ -140,8 +124,8 @@ public final class KeyedLock<T> {
 
     @SuppressWarnings("serial")
     private static final class KeyLock extends ReentrantLock {
-        KeyLock(boolean fair) {
-            super(fair);
+        KeyLock() {
+            super();
         }
 
         private final AtomicInteger count = new AtomicInteger(1);

+ 5 - 0
server/src/main/java/org/elasticsearch/index/engine/Engine.java

@@ -758,6 +758,11 @@ public abstract class Engine implements Closeable {
      */
     public abstract void asyncEnsureTranslogSynced(Translog.Location location, Consumer<Exception> listener);
 
+    /**
+     * Ensures that the global checkpoint has been persisted to the underlying storage.
+     */
+    public abstract void asyncEnsureGlobalCheckpointSynced(long globalCheckpoint, Consumer<Exception> listener);
+
     public abstract void syncTranslog() throws IOException;
 
     /**

+ 21 - 5
server/src/main/java/org/elasticsearch/index/engine/InternalEngine.java

@@ -179,7 +179,7 @@ public class InternalEngine extends Engine {
     private final SoftDeletesPolicy softDeletesPolicy;
     private final LastRefreshedCheckpointListener lastRefreshedCheckpointListener;
     private final FlushListeners flushListener;
-    private final AsyncIOProcessor<Translog.Location> translogSyncProcessor;
+    private final AsyncIOProcessor<Tuple<Long, Translog.Location>> translogSyncProcessor;
 
     private final CompletionStatsCache completionStatsCache;
 
@@ -617,12 +617,23 @@ public class InternalEngine extends Engine {
         return getTranslog().syncNeeded();
     }
 
-    private AsyncIOProcessor<Translog.Location> createTranslogSyncProcessor(Logger logger, ThreadContext threadContext) {
+    private AsyncIOProcessor<Tuple<Long, Translog.Location>> createTranslogSyncProcessor(Logger logger, ThreadContext threadContext) {
         return new AsyncIOProcessor<>(logger, 1024, threadContext) {
             @Override
-            protected void write(List<Tuple<Translog.Location, Consumer<Exception>>> candidates) throws IOException {
+            protected void write(List<Tuple<Tuple<Long, Translog.Location>, Consumer<Exception>>> candidates) throws IOException {
                 try {
-                    final boolean synced = translog.ensureSynced(candidates.stream().map(Tuple::v1));
+                    Translog.Location location = Translog.Location.EMPTY;
+                    long processGlobalCheckpoint = SequenceNumbers.UNASSIGNED_SEQ_NO;
+                    for (Tuple<Tuple<Long, Translog.Location>, Consumer<Exception>> syncMarkers : candidates) {
+                        Tuple<Long, Translog.Location> marker = syncMarkers.v1();
+                        long globalCheckpointToSync = marker.v1();
+                        if (globalCheckpointToSync != SequenceNumbers.UNASSIGNED_SEQ_NO) {
+                            processGlobalCheckpoint = SequenceNumbers.max(processGlobalCheckpoint, globalCheckpointToSync);
+                        }
+                        location = location.compareTo(marker.v2()) >= 0 ? location : marker.v2();
+                    }
+
+                    final boolean synced = translog.ensureSynced(location, processGlobalCheckpoint);
                     if (synced) {
                         revisitIndexDeletionPolicyOnTranslogSynced();
                     }
@@ -639,7 +650,12 @@ public class InternalEngine extends Engine {
 
     @Override
     public void asyncEnsureTranslogSynced(Translog.Location location, Consumer<Exception> listener) {
-        translogSyncProcessor.put(location, listener);
+        translogSyncProcessor.put(new Tuple<>(SequenceNumbers.NO_OPS_PERFORMED, location), listener);
+    }
+
+    @Override
+    public void asyncEnsureGlobalCheckpointSynced(long globalCheckpoint, Consumer<Exception> listener) {
+        translogSyncProcessor.put(new Tuple<>(globalCheckpoint, Translog.Location.EMPTY), listener);
     }
 
     @Override

+ 5 - 0
server/src/main/java/org/elasticsearch/index/engine/ReadOnlyEngine.java

@@ -338,6 +338,11 @@ public class ReadOnlyEngine extends Engine {
         listener.accept(null);
     }
 
+    @Override
+    public void asyncEnsureGlobalCheckpointSynced(long globalCheckpoint, Consumer<Exception> listener) {
+        listener.accept(null);
+    }
+
     @Override
     public void syncTranslog() {}
 

+ 15 - 11
server/src/main/java/org/elasticsearch/index/seqno/GlobalCheckpointSyncAction.java

@@ -62,7 +62,9 @@ public class GlobalCheckpointSyncAction extends TransportReplicationAction<
             actionFilters,
             Request::new,
             Request::new,
-            ThreadPool.Names.MANAGEMENT
+            ThreadPool.Names.WRITE,
+            false,
+            true
         );
     }
 
@@ -77,24 +79,26 @@ public class GlobalCheckpointSyncAction extends TransportReplicationAction<
         IndexShard indexShard,
         ActionListener<PrimaryResult<Request, ReplicationResponse>> listener
     ) {
-        ActionListener.completeWith(listener, () -> {
-            maybeSyncTranslog(indexShard);
-            return new PrimaryResult<>(request, new ReplicationResponse());
-        });
+        maybeSyncTranslog(indexShard, listener.map(v -> new PrimaryResult<>(request, new ReplicationResponse())));
     }
 
     @Override
     protected void shardOperationOnReplica(Request shardRequest, IndexShard replica, ActionListener<ReplicaResult> listener) {
-        ActionListener.completeWith(listener, () -> {
-            maybeSyncTranslog(replica);
-            return new ReplicaResult();
-        });
+        maybeSyncTranslog(replica, listener.map(v -> new ReplicaResult()));
     }
 
-    private static void maybeSyncTranslog(final IndexShard indexShard) throws IOException {
+    private static <T> void maybeSyncTranslog(IndexShard indexShard, ActionListener<Void> listener) {
         if (indexShard.getTranslogDurability() == Translog.Durability.REQUEST
             && indexShard.getLastSyncedGlobalCheckpoint() < indexShard.getLastKnownGlobalCheckpoint()) {
-            indexShard.sync();
+            indexShard.syncGlobalCheckpoint(indexShard.getLastKnownGlobalCheckpoint(), e -> {
+                if (e == null) {
+                    listener.onResponse(null);
+                } else {
+                    listener.onFailure(e);
+                }
+            });
+        } else {
+            listener.onResponse(null);
         }
     }
 

+ 11 - 0
server/src/main/java/org/elasticsearch/index/shard/IndexShard.java

@@ -3615,6 +3615,17 @@ public class IndexShard extends AbstractIndexShardComponent implements IndicesCl
         getEngine().asyncEnsureTranslogSynced(location, syncListener);
     }
 
+    /**
+     * This method provides the same behavior as #sync but for persisting the global checkpoint. It will initiate a sync
+     * if the request global checkpoint is greater than the currently persisted global checkpoint. However, same as #sync it
+     * will not ensure that the request global checkpoint is available to be synced. It is the caller's duty to only call this
+     * method with a valid processed global checkpoint that is available to sync.
+     */
+    public void syncGlobalCheckpoint(long globalCheckpoint, Consumer<Exception> syncListener) {
+        verifyNotClosed();
+        getEngine().asyncEnsureGlobalCheckpointSynced(globalCheckpoint, syncListener);
+    }
+
     public void sync() throws IOException {
         verifyNotClosed();
         getEngine().syncTranslog();

+ 9 - 23
server/src/main/java/org/elasticsearch/index/translog/Translog.java

@@ -51,7 +51,6 @@ import java.util.Collections;
 import java.util.Iterator;
 import java.util.List;
 import java.util.Objects;
-import java.util.Optional;
 import java.util.OptionalLong;
 import java.util.concurrent.atomic.AtomicBoolean;
 import java.util.concurrent.locks.ReadWriteLock;
@@ -839,15 +838,18 @@ public class Translog extends AbstractIndexShardComponent implements IndexShardC
     }
 
     /**
-     * Ensures that the given location has be synced / written to the underlying storage.
+     * Ensures that the given location and global checkpoint has be synced / written to the underlying storage.
      *
      * @return Returns <code>true</code> iff this call caused an actual sync operation otherwise <code>false</code>
      */
-    public boolean ensureSynced(Location location) throws IOException {
+    public boolean ensureSynced(Location location, long globalCheckpoint) throws IOException {
         try (ReleasableLock lock = readLock.acquire()) {
-            if (location.generation == current.getGeneration()) { // if we have a new one it's already synced
+            // if we have a new generation and the persisted global checkpoint is greater than or equal to the sync global checkpoint it's
+            // already synced
+            long persistedGlobalCheckpoint = current.getLastSyncedCheckpoint().globalCheckpoint;
+            if (location.generation == current.getGeneration() || persistedGlobalCheckpoint < globalCheckpoint) {
                 ensureOpen();
-                return current.syncUpTo(location.translogLocation + location.size);
+                return current.syncUpTo(location.translogLocation + location.size, globalCheckpoint);
             }
         } catch (final Exception ex) {
             closeOnTragicEvent(ex);
@@ -856,24 +858,6 @@ public class Translog extends AbstractIndexShardComponent implements IndexShardC
         return false;
     }
 
-    /**
-     * Ensures that all locations in the given stream have been synced / written to the underlying storage.
-     * This method allows for internal optimization to minimize the amount of fsync operations if multiple
-     * locations must be synced.
-     *
-     * @return Returns <code>true</code> iff this call caused an actual sync operation otherwise <code>false</code>
-     */
-    public boolean ensureSynced(Stream<Location> locations) throws IOException {
-        final Optional<Location> max = locations.max(Location::compareTo);
-        // we only need to sync the max location since it will sync all other
-        // locations implicitly
-        if (max.isPresent()) {
-            return ensureSynced(max.get());
-        } else {
-            return false;
-        }
-    }
-
     /**
      * Closes the translog if the current translog writer experienced a tragic exception.
      *
@@ -929,6 +913,8 @@ public class Translog extends AbstractIndexShardComponent implements IndexShardC
 
     public static class Location implements Comparable<Location> {
 
+        public static Location EMPTY = new Location(0, 0, 0);
+
         public final long generation;
         public final long translogLocation;
         public final int size;

+ 11 - 4
server/src/main/java/org/elasticsearch/index/translog/TranslogWriter.java

@@ -346,7 +346,7 @@ public class TranslogWriter extends BaseTranslogReader implements Closeable {
      * raising the exception.
      */
     public void sync() throws IOException {
-        syncUpTo(Long.MAX_VALUE);
+        syncUpTo(Long.MAX_VALUE, SequenceNumbers.UNASSIGNED_SEQ_NO);
     }
 
     /**
@@ -462,10 +462,17 @@ public class TranslogWriter extends BaseTranslogReader implements Closeable {
      *
      * @return <code>true</code> if this call caused an actual sync operation
      */
-    final boolean syncUpTo(long offset) throws IOException {
-        if (lastSyncedCheckpoint.offset < offset && syncNeeded()) {
+    final boolean syncUpTo(long offset, long globalCheckpointToPersist) throws IOException {
+        if ((lastSyncedCheckpoint.offset < offset || lastSyncedCheckpoint.globalCheckpoint < globalCheckpointToPersist) && syncNeeded()) {
+            assert globalCheckpointToPersist <= globalCheckpointSupplier.getAsLong()
+                : "globalCheckpointToPersist ["
+                    + globalCheckpointToPersist
+                    + "] greater than global checkpoint ["
+                    + globalCheckpointSupplier.getAsLong()
+                    + "]";
             synchronized (syncLock) { // only one sync/checkpoint should happen concurrently but we wait
-                if (lastSyncedCheckpoint.offset < offset && syncNeeded()) {
+                if ((lastSyncedCheckpoint.offset < offset || lastSyncedCheckpoint.globalCheckpoint < globalCheckpointToPersist)
+                    && syncNeeded()) {
                     // double checked locking - we don't want to fsync unless we have to and now that we have
                     // the lock we should check again since if this code is busy we might have fsynced enough already
                     final Checkpoint checkpointToSync;

+ 1 - 1
server/src/test/java/org/elasticsearch/common/util/concurrent/KeyedLockTests.java

@@ -29,7 +29,7 @@ public class KeyedLockTests extends ESTestCase {
     public void testIfMapEmptyAfterLotsOfAcquireAndReleases() throws InterruptedException {
         ConcurrentHashMap<String, Integer> counter = new ConcurrentHashMap<>();
         ConcurrentHashMap<String, AtomicInteger> safeCounter = new ConcurrentHashMap<>();
-        KeyedLock<String> connectionLock = new KeyedLock<>(randomBoolean());
+        KeyedLock<String> connectionLock = new KeyedLock<>();
         String[] names = new String[randomIntBetween(1, 40)];
         for (int i = 0; i < names.length; i++) {
             names[i] = randomRealisticUnicodeOfLengthBetween(10, 20);

+ 13 - 2
server/src/test/java/org/elasticsearch/index/seqno/GlobalCheckpointSyncActionTests.java

@@ -27,8 +27,13 @@ import org.elasticsearch.threadpool.ThreadPool;
 import org.elasticsearch.transport.TransportService;
 
 import java.util.Collections;
+import java.util.function.Consumer;
 
 import static org.elasticsearch.test.ClusterServiceUtils.createClusterService;
+import static org.mockito.ArgumentMatchers.any;
+import static org.mockito.ArgumentMatchers.anyLong;
+import static org.mockito.ArgumentMatchers.eq;
+import static org.mockito.Mockito.doAnswer;
 import static org.mockito.Mockito.mock;
 import static org.mockito.Mockito.never;
 import static org.mockito.Mockito.verify;
@@ -100,6 +105,11 @@ public class GlobalCheckpointSyncActionTests extends ESTestCase {
 
         when(indexShard.getLastKnownGlobalCheckpoint()).thenReturn(globalCheckpoint);
         when(indexShard.getLastSyncedGlobalCheckpoint()).thenReturn(lastSyncedGlobalCheckpoint);
+        doAnswer(invocation -> {
+            Consumer<Exception> argument = invocation.getArgument(1);
+            argument.accept(null);
+            return null;
+        }).when(indexShard).syncGlobalCheckpoint(anyLong(), any());
 
         final GlobalCheckpointSyncAction action = new GlobalCheckpointSyncAction(
             Settings.EMPTY,
@@ -123,9 +133,10 @@ public class GlobalCheckpointSyncActionTests extends ESTestCase {
 
         if (durability == Translog.Durability.ASYNC || lastSyncedGlobalCheckpoint == globalCheckpoint) {
             verify(indexShard, never()).sync();
+            verify(indexShard, never()).syncGlobalCheckpoint(anyLong(), any());
         } else {
-            verify(indexShard).sync();
+            verify(indexShard, never()).sync();
+            verify(indexShard).syncGlobalCheckpoint(eq(globalCheckpoint), any());
         }
     }
-
 }

+ 63 - 0
server/src/test/java/org/elasticsearch/index/shard/IndexShardTests.java

@@ -1534,6 +1534,69 @@ public class IndexShardTests extends IndexShardTestCase {
         closeShards(shard);
     }
 
+    public void testAsyncPersistGlobalCheckpointSync() throws InterruptedException, IOException {
+        final ShardId shardId = new ShardId("index", "_na_", 0);
+        final ShardRouting shardRouting = TestShardRouting.newShardRouting(
+            shardId,
+            randomAlphaOfLength(8),
+            true,
+            ShardRoutingState.INITIALIZING,
+            RecoverySource.EmptyStoreRecoverySource.INSTANCE
+        );
+        final Settings settings = indexSettings(Version.CURRENT, 1, 2).build();
+        final IndexMetadata.Builder indexMetadata = IndexMetadata.builder(shardRouting.getIndexName()).settings(settings).primaryTerm(0, 1);
+        IndexShard shard = newShard(
+            shardRouting,
+            indexMetadata.build(),
+            null,
+            new InternalEngineFactory(),
+            ignoredShardId -> {},
+            RetentionLeaseSyncer.EMPTY
+        );
+        recoverShardFromStore(shard);
+
+        final int maxSeqNo = randomIntBetween(0, 128);
+        for (int i = 0; i <= maxSeqNo; i++) {
+            EngineTestCase.generateNewSeqNo(shard.getEngine());
+        }
+        final long checkpoint = rarely() ? maxSeqNo - scaledRandomIntBetween(0, maxSeqNo) : maxSeqNo;
+        shard.updateLocalCheckpointForShard(shardRouting.allocationId().getId(), checkpoint);
+        shard.updateGlobalCheckpointForShard(shard.routingEntry().allocationId().getId(), shard.getLocalCheckpoint());
+
+        Semaphore semaphore = new Semaphore(Integer.MAX_VALUE);
+        Thread[] thread = new Thread[randomIntBetween(3, 5)];
+        CountDownLatch latch = new CountDownLatch(thread.length);
+        for (int i = 0; i < thread.length; i++) {
+            thread[i] = new Thread() {
+                @Override
+                public void run() {
+                    try {
+                        latch.countDown();
+                        latch.await();
+                        for (int i = 0; i < 10000; i++) {
+                            semaphore.acquire();
+                            shard.syncGlobalCheckpoint(
+                                randomLongBetween(0, shard.getLastKnownGlobalCheckpoint()),
+                                (ex) -> semaphore.release()
+                            );
+                        }
+                    } catch (Exception ex) {
+                        throw new RuntimeException(ex);
+                    }
+                }
+            };
+            thread[i].start();
+        }
+
+        for (int i = 0; i < thread.length; i++) {
+            thread[i].join();
+        }
+        assertTrue(semaphore.tryAcquire(Integer.MAX_VALUE, 10, TimeUnit.SECONDS));
+        assertEquals(shard.getLastKnownGlobalCheckpoint(), shard.getLastSyncedGlobalCheckpoint());
+
+        closeShards(shard);
+    }
+
     public void testShardStats() throws IOException {
 
         IndexShard shard = newStartedShard();

+ 52 - 43
server/src/test/java/org/elasticsearch/index/translog/TranslogTests.java

@@ -1024,7 +1024,7 @@ public class TranslogTests extends ESTestCase {
                             fail("duplicate op [" + op + "], old entry at " + location);
                         }
                         if (id % writers.length == threadId) {
-                            translog.ensureSynced(location);
+                            translog.ensureSynced(location, SequenceNumbers.UNASSIGNED_SEQ_NO);
                         }
                         if (id % flushEveryOps == 0) {
                             synchronized (flushMutex) {
@@ -1162,67 +1162,72 @@ public class TranslogTests extends ESTestCase {
         logger.info("--> test done. total ops written [{}]", writtenOps.size());
     }
 
-    public void testSyncUpTo() throws IOException {
-        int translogOperations = randomIntBetween(10, 100);
-        int count = 0;
-        for (int op = 0; op < translogOperations; op++) {
-            int seqNo = ++count;
-            final Translog.Location location = translog.add(TranslogOperationsUtils.indexOp("" + op, seqNo, primaryTerm.get()));
-            if (randomBoolean()) {
-                assertTrue("at least one operation pending", translog.syncNeeded());
-                assertTrue("this operation has not been synced", translog.ensureSynced(location));
-                // we are the last location so everything should be synced
-                assertFalse("the last call to ensureSycned synced all previous ops", translog.syncNeeded());
-                seqNo = ++count;
-                translog.add(TranslogOperationsUtils.indexOp("" + op, seqNo, primaryTerm.get()));
-                assertTrue("one pending operation", translog.syncNeeded());
-                assertFalse("this op has been synced before", translog.ensureSynced(location)); // not syncing now
-                assertTrue("we only synced a previous operation yet", translog.syncNeeded());
-            }
-            if (rarely()) {
-                translog.rollGeneration();
-                assertFalse("location is from a previous translog - already synced", translog.ensureSynced(location)); // not syncing now
-                assertFalse("no sync needed since no operations in current translog", translog.syncNeeded());
-            }
-
-            if (randomBoolean()) {
-                translog.sync();
-                assertFalse("translog has been synced already", translog.ensureSynced(location));
-            }
-        }
-    }
+    public void testSyncUpToLocationAndCheckpoint() throws IOException {
+        assertFalse(
+            "translog empty location and not ops performed will not require sync",
+            translog.ensureSynced(Location.EMPTY, SequenceNumbers.UNASSIGNED_SEQ_NO)
+        );
 
-    public void testSyncUpToStream() throws IOException {
-        int iters = randomIntBetween(5, 10);
+        int iters = randomIntBetween(25, 50);
+        Location alreadySynced = Location.EMPTY;
+        long alreadySyncedCheckpoint = SequenceNumbers.UNASSIGNED_SEQ_NO;
         for (int i = 0; i < iters; i++) {
             int translogOperations = randomIntBetween(10, 100);
             int count = 0;
-            ArrayList<Location> locations = new ArrayList<>();
+
+            Location location = null;
+            final ArrayList<Location> locations = new ArrayList<>();
+            final ArrayList<Location> locationsInCurrentGeneration = new ArrayList<>();
             for (int op = 0; op < translogOperations; op++) {
                 if (rarely()) {
                     translog.rollGeneration();
+                    locationsInCurrentGeneration.clear();
                 }
-                final Translog.Location location = translog.add(indexOp("" + op, op, primaryTerm.get(), Integer.toString(++count)));
+                location = translog.add(indexOp("" + op, op, primaryTerm.get(), Integer.toString(++count)));
+                globalCheckpoint.incrementAndGet();
                 locations.add(location);
+                locationsInCurrentGeneration.add(location);
             }
-            Collections.shuffle(locations, random());
+
+            assertFalse("should have been synced on previous iteration", translog.ensureSynced(alreadySynced, alreadySyncedCheckpoint));
+
             if (randomBoolean()) {
                 assertTrue("at least one operation pending", translog.syncNeeded());
-                assertTrue("this operation has not been synced", translog.ensureSynced(locations.stream()));
-                // we are the last location so everything should be synced
+                if (randomBoolean()) {
+                    Location randomLocationToSync = locationsInCurrentGeneration.get(randomInt(locationsInCurrentGeneration.size() - 1));
+                    assertTrue(
+                        "this operation has not been synced",
+                        translog.ensureSynced(randomLocationToSync, SequenceNumbers.UNASSIGNED_SEQ_NO)
+                    );
+                } else {
+                    long globalCheckpointToSync = randomLongBetween(translog.getLastSyncedGlobalCheckpoint() + 1, globalCheckpoint.get());
+                    assertTrue(
+                        "this global checkpoint has not been persisted",
+                        translog.ensureSynced(Location.EMPTY, globalCheckpointToSync)
+                    );
+                }
+                // everything should be synced
                 assertFalse("the last call to ensureSycned synced all previous ops", translog.syncNeeded());
             } else if (rarely()) {
                 translog.rollGeneration();
                 // not syncing now
-                assertFalse("location is from a previous translog - already synced", translog.ensureSynced(locations.stream()));
+                assertFalse(
+                    "location is from a previous translog - already synced",
+                    translog.ensureSynced(location, globalCheckpoint.get())
+                );
                 assertFalse("no sync needed since no operations in current translog", translog.syncNeeded());
             } else {
                 translog.sync();
-                assertFalse("translog has been synced already", translog.ensureSynced(locations.stream()));
+                assertFalse("translog has been synced already", translog.ensureSynced(location, globalCheckpoint.get()));
             }
-            for (Location location : locations) {
-                assertFalse("all of the locations should be synced: " + location, translog.ensureSynced(location));
+
+            Collections.shuffle(locations, random());
+            for (Location l : locations) {
+                assertFalse("all of the locations should be synced: " + l, translog.ensureSynced(l, SequenceNumbers.UNASSIGNED_SEQ_NO));
             }
+
+            alreadySynced = location;
+            alreadySyncedCheckpoint = globalCheckpoint.get();
         }
     }
 
@@ -2550,7 +2555,7 @@ public class TranslogTests extends ESTestCase {
         try {
             Translog.Location location = translog.add(indexOp("2", 1, primaryTerm.get(), lineFileDocs.nextDoc().toString()));
             if (randomBoolean()) {
-                translog.ensureSynced(location);
+                translog.ensureSynced(location, SequenceNumbers.UNASSIGNED_SEQ_NO);
             } else {
                 translog.sync();
             }
@@ -3888,7 +3893,11 @@ public class TranslogTests extends ESTestCase {
                             long globalCheckpoint = lastGlobalCheckpoint.get();
                             final boolean synced;
                             if (randomBoolean()) {
-                                synced = translog.ensureSynced(location);
+                                if (randomBoolean()) {
+                                    synced = translog.ensureSynced(location, globalCheckpoint);
+                                } else {
+                                    synced = translog.ensureSynced(location, SequenceNumbers.UNASSIGNED_SEQ_NO);
+                                }
                             } else {
                                 translog.sync();
                                 synced = true;