1
0
Эх сурвалжийг харах

Advance max_seq_no before add operation to Lucene (#38879)

Today when processing an operation on a replica engine (or the 
following engine), we first add it to Lucene, then add it to translog, 
then finally marks its seq_no as completed. If a flush occurs after step1,
but before step-3, the max_seq_no in the commit's user_data will be
smaller than the seq_no of some documents in the Lucene commit.
Nhat Nguyen 6 жил өмнө
parent
commit
5624eee282

+ 10 - 0
server/src/main/java/org/elasticsearch/index/engine/InternalEngine.java

@@ -948,6 +948,7 @@ public class InternalEngine extends Engine {
                 }
             }
         }
+        markSeqNoAsSeen(index.seqNo());
         return plan;
     }
 
@@ -1301,6 +1302,7 @@ public class InternalEngine extends Engine {
                     delete.seqNo(), delete.version());
             }
         }
+        markSeqNoAsSeen(delete.seqNo());
         return plan;
     }
 
@@ -1455,6 +1457,7 @@ public class InternalEngine extends Engine {
     public NoOpResult noOp(final NoOp noOp) {
         NoOpResult noOpResult;
         try (ReleasableLock ignored = readLock.acquire()) {
+            markSeqNoAsSeen(noOp.seqNo());
             noOpResult = innerNoOp(noOp);
         } catch (final Exception e) {
             noOpResult = new NoOpResult(getPrimaryTerm(), noOp.seqNo(), e);
@@ -2434,6 +2437,13 @@ public class InternalEngine extends Engine {
         localCheckpointTracker.waitForOpsToComplete(seqNo);
     }
 
+    /**
+     * Marks the given seq_no as seen and advances the max_seq_no of this engine to at least that value.
+     */
+    protected final void markSeqNoAsSeen(long seqNo) {
+        localCheckpointTracker.advanceMaxSeqNo(seqNo);
+    }
+
     /**
      * Checks if the given operation has been processed in this engine or not.
      * @return true if the given operation was processed; otherwise false.

+ 9 - 0
server/src/main/java/org/elasticsearch/index/seqno/LocalCheckpointTracker.java

@@ -81,6 +81,15 @@ public class LocalCheckpointTracker {
         return nextSeqNo++;
     }
 
+    /**
+     * Marks the provided sequence number as seen and updates the max_seq_no if needed.
+     */
+    public synchronized void advanceMaxSeqNo(long seqNo) {
+        if (seqNo >= nextSeqNo) {
+            nextSeqNo = seqNo + 1;
+        }
+    }
+
     /**
      * Marks the processing of the provided sequence number as completed as updates the checkpoint if possible.
      *

+ 38 - 0
server/src/test/java/org/elasticsearch/index/engine/InternalEngineTests.java

@@ -5653,4 +5653,42 @@ public class InternalEngineTests extends EngineTestCase {
             }
         }
     }
+
+    public void testMaxSeqNoInCommitUserData() throws Exception {
+        AtomicBoolean running = new AtomicBoolean(true);
+        Thread rollTranslog = new Thread(() -> {
+            while (running.get() && engine.getTranslog().currentFileGeneration() < 500) {
+                engine.rollTranslogGeneration(); // make adding operations to translog slower
+            }
+        });
+        rollTranslog.start();
+
+        Thread indexing = new Thread(() -> {
+            long seqNo = 0;
+            while (running.get() && seqNo <= 1000) {
+                try {
+                    String id = Long.toString(between(1, 50));
+                    if (randomBoolean()) {
+                        ParsedDocument doc = testParsedDocument(id, null, testDocumentWithTextField(), SOURCE, null);
+                        engine.index(replicaIndexForDoc(doc, 1L, seqNo, false));
+                    } else {
+                        engine.delete(replicaDeleteForDoc(id, 1L, seqNo, 0L));
+                    }
+                    seqNo++;
+                } catch (IOException e) {
+                    throw new AssertionError(e);
+                }
+            }
+        });
+        indexing.start();
+
+        int numCommits = between(5, 20);
+        for (int i = 0; i < numCommits; i++) {
+            engine.flush(false, true);
+        }
+        running.set(false);
+        indexing.join();
+        rollTranslog.join();
+        assertMaxSeqNoInCommitUserData(engine);
+    }
 }

+ 31 - 11
test/framework/src/main/java/org/elasticsearch/index/engine/EngineTestCase.java

@@ -27,6 +27,8 @@ import org.apache.lucene.document.Field;
 import org.apache.lucene.document.NumericDocValuesField;
 import org.apache.lucene.document.StoredField;
 import org.apache.lucene.document.TextField;
+import org.apache.lucene.index.DirectoryReader;
+import org.apache.lucene.index.IndexCommit;
 import org.apache.lucene.index.IndexWriter;
 import org.apache.lucene.index.IndexWriterConfig;
 import org.apache.lucene.index.LeafReader;
@@ -126,6 +128,7 @@ import static org.elasticsearch.index.engine.Engine.Operation.Origin.PRIMARY;
 import static org.elasticsearch.index.engine.Engine.Operation.Origin.REPLICA;
 import static org.elasticsearch.index.translog.TranslogDeletionPolicies.createTranslogDeletionPolicy;
 import static org.hamcrest.Matchers.equalTo;
+import static org.hamcrest.Matchers.greaterThanOrEqualTo;
 import static org.hamcrest.Matchers.notNullValue;
 
 public abstract class EngineTestCase extends ESTestCase {
@@ -254,18 +257,20 @@ public abstract class EngineTestCase extends ESTestCase {
     @After
     public void tearDown() throws Exception {
         super.tearDown();
-        if (engine != null && engine.isClosed.get() == false) {
-            engine.getTranslog().getDeletionPolicy().assertNoOpenTranslogRefs();
-            assertConsistentHistoryBetweenTranslogAndLuceneIndex(engine, createMapperService("test"));
-        }
-        if (replicaEngine != null && replicaEngine.isClosed.get() == false) {
-            replicaEngine.getTranslog().getDeletionPolicy().assertNoOpenTranslogRefs();
-            assertConsistentHistoryBetweenTranslogAndLuceneIndex(replicaEngine, createMapperService("test"));
+        try {
+            if (engine != null && engine.isClosed.get() == false) {
+                engine.getTranslog().getDeletionPolicy().assertNoOpenTranslogRefs();
+                assertConsistentHistoryBetweenTranslogAndLuceneIndex(engine, createMapperService("test"));
+                assertMaxSeqNoInCommitUserData(engine);
+            }
+            if (replicaEngine != null && replicaEngine.isClosed.get() == false) {
+                replicaEngine.getTranslog().getDeletionPolicy().assertNoOpenTranslogRefs();
+                assertConsistentHistoryBetweenTranslogAndLuceneIndex(replicaEngine, createMapperService("test"));
+                assertMaxSeqNoInCommitUserData(replicaEngine);
+            }
+        } finally {
+            IOUtils.close(replicaEngine, storeReplica, engine, store, () -> terminate(threadPool));
         }
-        IOUtils.close(
-                replicaEngine, storeReplica,
-                engine, store);
-        terminate(threadPool);
     }
 
 
@@ -1067,6 +1072,21 @@ public abstract class EngineTestCase extends ESTestCase {
         }
     }
 
+    /**
+     * Asserts that the max_seq_no stored in the commit's user_data is never smaller than seq_no of any document in the commit.
+     */
+    public static void assertMaxSeqNoInCommitUserData(Engine engine) throws Exception {
+        List<IndexCommit> commits = DirectoryReader.listCommits(engine.store.directory());
+        for (IndexCommit commit : commits) {
+            try (DirectoryReader reader = DirectoryReader.open(commit)) {
+                AtomicLong maxSeqNoFromDocs = new AtomicLong(SequenceNumbers.NO_OPS_PERFORMED);
+                Lucene.scanSeqNosInReader(reader, 0, Long.MAX_VALUE, n -> maxSeqNoFromDocs.set(Math.max(n, maxSeqNoFromDocs.get())));
+                assertThat(Long.parseLong(commit.getUserData().get(SequenceNumbers.MAX_SEQ_NO)),
+                    greaterThanOrEqualTo(maxSeqNoFromDocs.get()));
+            }
+        }
+    }
+
     public static MapperService createMapperService(String type) throws IOException {
         IndexMetaData indexMetaData = IndexMetaData.builder("test")
             .settings(Settings.builder()

+ 2 - 0
x-pack/plugin/ccr/src/main/java/org/elasticsearch/xpack/ccr/index/engine/FollowingEngine.java

@@ -68,6 +68,7 @@ public final class FollowingEngine extends InternalEngine {
     @Override
     protected InternalEngine.IndexingStrategy indexingStrategyForOperation(final Index index) throws IOException {
         preFlight(index);
+        markSeqNoAsSeen(index.seqNo());
         // NOTES: refer Engine#getMaxSeqNoOfUpdatesOrDeletes for the explanation of the optimization using sequence numbers.
         final long maxSeqNoOfUpdatesOrDeletes = getMaxSeqNoOfUpdatesOrDeletes();
         assert maxSeqNoOfUpdatesOrDeletes != SequenceNumbers.UNASSIGNED_SEQ_NO : "max_seq_no_of_updates is not initialized";
@@ -103,6 +104,7 @@ public final class FollowingEngine extends InternalEngine {
     @Override
     protected InternalEngine.DeletionStrategy deletionStrategyForOperation(final Delete delete) throws IOException {
         preFlight(delete);
+        markSeqNoAsSeen(delete.seqNo());
         if (delete.origin() == Operation.Origin.PRIMARY && hasBeenProcessedBefore(delete)) {
             // See the comment in #indexingStrategyForOperation for the explanation why we can safely skip this operation.
             final AlreadyProcessedFollowingEngineException error = new AlreadyProcessedFollowingEngineException(

+ 46 - 0
x-pack/plugin/ccr/src/test/java/org/elasticsearch/xpack/ccr/index/engine/FollowingEngineTests.java

@@ -59,6 +59,7 @@ import java.util.concurrent.atomic.AtomicLong;
 import java.util.stream.Collectors;
 
 import static org.elasticsearch.index.engine.EngineTestCase.getDocIds;
+import static org.elasticsearch.index.engine.EngineTestCase.getTranslog;
 import static org.hamcrest.Matchers.containsString;
 import static org.hamcrest.Matchers.equalTo;
 import static org.hamcrest.Matchers.greaterThanOrEqualTo;
@@ -659,4 +660,49 @@ public class FollowingEngineTests extends ESTestCase {
                 }
             });
     }
+
+    public void testMaxSeqNoInCommitUserData() throws Exception {
+        final Settings settings = Settings.builder().put("index.number_of_shards", 1).put("index.number_of_replicas", 0)
+            .put("index.version.created", Version.CURRENT).put("index.xpack.ccr.following_index", true)
+            .put(IndexSettings.INDEX_SOFT_DELETES_SETTING.getKey(), true).build();
+        final IndexMetaData indexMetaData = IndexMetaData.builder(index.getName()).settings(settings).build();
+        final IndexSettings indexSettings = new IndexSettings(indexMetaData, settings);
+        try (Store store = createStore(shardId, indexSettings, newDirectory())) {
+            final EngineConfig engineConfig = engineConfig(shardId, indexSettings, threadPool, store, logger, xContentRegistry());
+            try (FollowingEngine engine = createEngine(store, engineConfig)) {
+                AtomicBoolean running = new AtomicBoolean(true);
+                Thread rollTranslog = new Thread(() -> {
+                    while (running.get() && getTranslog(engine).currentFileGeneration() < 500) {
+                        engine.rollTranslogGeneration(); // make adding operations to translog slower
+                    }
+                });
+                rollTranslog.start();
+
+                Thread indexing = new Thread(() -> {
+                    List<Engine.Operation> ops = EngineTestCase.generateSingleDocHistory(true, VersionType.EXTERNAL, 2, 50, 500, "id");
+                    engine.advanceMaxSeqNoOfUpdatesOrDeletes(ops.stream().mapToLong(Engine.Operation::seqNo).max().getAsLong());
+                    for (Engine.Operation op : ops) {
+                        if (running.get() == false) {
+                            return;
+                        }
+                        try {
+                            EngineTestCase.applyOperation(engine, op);
+                        } catch (IOException e) {
+                            throw new AssertionError(e);
+                        }
+                    }
+                });
+                indexing.start();
+
+                int numCommits = between(5, 20);
+                for (int i = 0; i < numCommits; i++) {
+                    engine.flush(false, true);
+                }
+                running.set(false);
+                indexing.join();
+                rollTranslog.join();
+                EngineTestCase.assertMaxSeqNoInCommitUserData(engine);
+            }
+        }
+    }
 }