Browse Source

Update translog writeLocation for flushListener after commit (#109603)

During flush, the shard can keep processing concurrent indexing requests
which advance the translog location. If all changes are then commited by
the indexWriter, we will update the `flushListener` with a more
up-to-date translog location that is read after the commit.
Yang Wang 1 year ago
parent
commit
bdc65cbc90

+ 5 - 0
docs/changelog/109603.yaml

@@ -0,0 +1,5 @@
+pr: 109603
+summary: Update translog `writeLocation` for `flushListener` after commit
+area: Engine
+type: enhancement
+issues: []

+ 8 - 0
server/src/main/java/org/elasticsearch/index/engine/InternalEngine.java

@@ -2223,6 +2223,14 @@ public class InternalEngine extends Engine {
                     // we need to refresh in order to clear older version values
                     refresh("version_table_flush", SearcherScope.INTERNAL, true);
                     translog.trimUnreferencedReaders();
+                    // Update the translog location for flushListener if (1) the writeLocation has changed during the flush and
+                    // (2) indexWriter has committed all the changes (checks must be done in this order).
+                    // If the indexWriter has uncommitted changes, they will be flushed by the next flush as intended.
+                    final Translog.Location writeLocationAfterFlush = translog.getLastWriteLocation();
+                    if (writeLocationAfterFlush.equals(commitLocation) == false && hasUncommittedChanges() == false) {
+                        assert writeLocationAfterFlush.compareTo(commitLocation) > 0 : writeLocationAfterFlush + " <= " + commitLocation;
+                        commitLocation = writeLocationAfterFlush;
+                    }
                     // Use the timestamp from when the flush started, but only update it in case of success, so that any exception in
                     // the above lines would not lead the engine to think that it recently flushed, when it did not.
                     this.lastFlushTimestamp = lastFlushTimestamp;

+ 74 - 0
server/src/test/java/org/elasticsearch/index/engine/InternalEngineTests.java

@@ -7737,6 +7737,80 @@ public class InternalEngineTests extends EngineTestCase {
         }
     }
 
+    public void testFlushListenerWithConcurrentIndexing() throws IOException, InterruptedException {
+        engine.close();
+        final var barrierReference = new AtomicReference<CyclicBarrier>();
+        engine = new InternalTestEngine(engine.config()) {
+            @Override
+            protected void commitIndexWriter(IndexWriter writer, Translog translog) throws IOException {
+                final CyclicBarrier barrier = barrierReference.get();
+                if (barrier != null) {
+                    safeAwait(barrier);
+                    safeAwait(barrier);
+                }
+                super.commitIndexWriter(writer, translog);
+                if (barrier != null) {
+                    safeAwait(barrier);
+                    safeAwait(barrier);
+                }
+            }
+        };
+        recoverFromTranslog(engine, translogHandler, Long.MAX_VALUE);
+        final var barrier = new CyclicBarrier(2);
+        barrierReference.set(barrier);
+
+        // (1) Indexing the 1st doc before flush and it should be visible after flush
+        final Engine.IndexResult result1 = engine.index(indexForDoc(createParsedDoc(randomIdentifier(), null)));
+        final PlainActionFuture<Long> future1 = new PlainActionFuture<>();
+        engine.addFlushListener(result1.getTranslogLocation(), future1);
+        assertFalse(future1.isDone());
+        final Thread flushThread = new Thread(() -> engine.flush());
+        flushThread.start();
+
+        // (2) Wait till flush thread block before commitIndexWriter and indexing the 2nd doc
+        safeAwait(barrier);
+        final Engine.IndexResult result2 = engine.index(indexForDoc(createParsedDoc(randomIdentifier(), null)));
+        final PlainActionFuture<Long> future2 = new PlainActionFuture<>();
+        engine.addFlushListener(result2.getTranslogLocation(), future2);
+        assertFalse(future2.isDone());
+
+        // Let flush completes the commit
+        safeAwait(barrier);
+        safeAwait(barrier);
+
+        // Randomly indexing the 3rd doc after commit.
+        final PlainActionFuture<Long> future3;
+        final boolean indexingAfterCommit = randomBoolean();
+        if (indexingAfterCommit) {
+            final Engine.IndexResult result3 = engine.index(indexForDoc(createParsedDoc(randomIdentifier(), null)));
+            future3 = new PlainActionFuture<>();
+            engine.addFlushListener(result3.getTranslogLocation(), future3);
+            assertFalse(future3.isDone());
+        } else {
+            future3 = null;
+        }
+        safeAwait(barrier);
+        flushThread.join();
+
+        // The translog location before flush (1st doc) is always visible
+        assertThat(safeGet(future1), equalTo(engine.getLastCommittedSegmentInfos().getGeneration()));
+
+        if (indexingAfterCommit) {
+            // Indexing after the commit makes indexWriter.hasUncommittedChanges() return true which in turn makes
+            // it unsafe to advance flushListener's commitLocation after commit. That is, the flushListener
+            // will not learn the translog location of the 2nd doc.
+            assertFalse(future2.isDone());
+            // It requires a 2nd flush to make all translog locations to be visible
+            barrierReference.set(null); // remove the flush barrier
+            engine.flush();
+            assertThat(safeGet(future2), equalTo(engine.getLastCommittedSegmentInfos().getGeneration()));
+            assertThat(safeGet(future3), equalTo(engine.getLastCommittedSegmentInfos().getGeneration()));
+        } else {
+            // If no indexing after commit, translog location of the 2nd doc should be visible.
+            assertThat(safeGet(future2), equalTo(engine.getLastCommittedSegmentInfos().getGeneration()));
+        }
+    }
+
     private static void assertCommitGenerations(Map<IndexCommit, Engine.IndexCommitRef> commits, List<Long> expectedGenerations) {
         assertCommitGenerations(commits.values().stream().map(Engine.IndexCommitRef::getIndexCommit).toList(), expectedGenerations);
     }