Browse Source

Record segment generation of ongoing flush (#95922)

The LiveVersionMapArchive records the segment generation that a set of doc IDs belong to and
clears them once that generation is visible on the search shards. However, while a flush is ongoing
(after committing IndexWriter and before updating the lastCommittedSegmentInfos) concurrent
index/update operations that see a refresh and are moved to archive could incorrectly be considered
as part of the propagated commit. This change records the ongoing commit generation slightly earlier
to be able to safely handle this concurrent index/updates.

relates ES-5728
Pooya Salehi 2 years ago
parent
commit
6db32e3a9e

+ 13 - 1
server/src/main/java/org/elasticsearch/index/engine/InternalEngine.java

@@ -145,8 +145,10 @@ public class InternalEngine extends Engine {
     private final LiveVersionMapArchive liveVersionMapArchive;
     // Records the last known generation during which LiveVersionMap was in unsafe mode. This indicates that only after this
     // generation it is safe to rely on the LiveVersionMap for a real-time get.
-    // TODO: can we move this entirely into the stateless plugin?
+    // TODO: move the following two to the stateless plugin
     private final AtomicLong lastUnsafeSegmentGenerationForGets = new AtomicLong(-1);
+    // Records the segment generation for the currently ongoing commit if any, or the last finished commit otherwise.
+    private final AtomicLong preCommitSegmentGeneration = new AtomicLong(-1);
 
     private volatile SegmentInfos lastCommittedSegmentInfos;
 
@@ -2101,6 +2103,12 @@ public class InternalEngine extends Engine {
                         translog.rollGeneration();
                         logger.trace("starting commit for flush; commitTranslog=true");
                         long lastFlushTimestamp = relativeTimeInNanosSupplier.getAsLong();
+                        // Pre-emptively recording the upcoming segment generation so that the live version map archive records
+                        // the correct segment generation for doc IDs that go to the archive while a flush is happening. Otherwise,
+                        // if right after committing the IndexWriter new docs get indexed/updated and a refresh moves them to the archive,
+                        // we clear them from the archive once we see that segment generation on the search shards, but those changes
+                        // were not included in the commit since they happened right after it.
+                        preCommitSegmentGeneration.set(lastCommittedSegmentInfos.getGeneration() + 1);
                         commitIndexWriter(indexWriter, translog);
                         logger.trace("finished commit for flush");
                         // we need to refresh in order to clear older version values
@@ -3238,4 +3246,8 @@ public class InternalEngine extends Engine {
         assert Objects.equals(get.uid().field(), IdFieldMapper.NAME) : get.uid().field();
         return true;
     }
+
+    protected long getPreCommitSegmentGeneration() {
+        return preCommitSegmentGeneration.get();
+    }
 }

+ 37 - 0
server/src/test/java/org/elasticsearch/index/engine/InternalEngineTests.java

@@ -673,6 +673,43 @@ public class InternalEngineTests extends EngineTestCase {
         }
     }
 
+    public void testPreCommitRecordedSegmentGeneration() throws IOException {
+        engine.close();
+        final AtomicLong preCommitGen = new AtomicLong(-1);
+        final AtomicLong lastSegmentInfoGenUponCommit = new AtomicLong(-1);
+        final AtomicLong postFlushSegmentInfoGen = new AtomicLong(-1);
+        engine = new InternalEngine(engine.config()) {
+
+            @Override
+            protected void commitIndexWriter(IndexWriter writer, Translog translog) throws IOException {
+                preCommitGen.set(getPreCommitSegmentGeneration());
+                lastSegmentInfoGenUponCommit.set(getLastCommittedSegmentInfos().getGeneration());
+                super.commitIndexWriter(writer, translog);
+            }
+
+            @Override
+            public void flush(boolean force, boolean waitIfOngoing, ActionListener<FlushResult> listener) throws EngineException {
+                super.flush(force, waitIfOngoing, listener);
+                postFlushSegmentInfoGen.set(getLastCommittedSegmentInfos().getGeneration());
+                assertThat(getPreCommitSegmentGeneration(), equalTo(preCommitGen.get()));
+            }
+        };
+        if (randomBoolean()) {
+            engine.recoverFromTranslog(translogHandler, Long.MAX_VALUE);
+        } else {
+            engine.skipTranslogRecovery();
+        }
+        engine.ensureCanFlush(); // recovered already
+        ParsedDocument doc = testParsedDocument("1", null, testDocumentWithTextField(), SOURCE, null);
+        engine.index(indexForDoc(doc));
+        engine.flush();
+        assertThat(preCommitGen.get(), greaterThan(-1L));
+        assertThat(lastSegmentInfoGenUponCommit.get(), greaterThan(-1L));
+        assertThat(postFlushSegmentInfoGen.get(), greaterThan(-1L));
+        assertThat(preCommitGen.get(), equalTo(lastSegmentInfoGenUponCommit.get() + 1));
+        assertThat(preCommitGen.get(), equalTo(postFlushSegmentInfoGen.get()));
+    }
+
     public void testTranslogRecoveryWithMultipleGenerations() throws IOException {
         final int docs = randomIntBetween(1, 4096);
         final List<Long> seqNos = LongStream.range(0, docs).boxed().collect(Collectors.toCollection(ArrayList::new));