Browse Source

Rebuild version map when opening internal engine (#43202)

With this change, we will rebuild the live version map and local
checkpoint using documents (including soft-deleted) from the safe commit
when opening an internal engine. This allows us to safely prune away _id
of all soft-deleted documents as the version map is always in-sync with
the Lucene index.

Relates #40741
Supersedes #42979
Nhat Nguyen 6 years ago
parent
commit
bbc29bb0fd

+ 0 - 38
server/src/main/java/org/elasticsearch/common/lucene/Lucene.java

@@ -27,7 +27,6 @@ import org.apache.lucene.codecs.CodecUtil;
 import org.apache.lucene.codecs.DocValuesFormat;
 import org.apache.lucene.codecs.PostingsFormat;
 import org.apache.lucene.document.LatLonDocValuesField;
-import org.apache.lucene.document.LongPoint;
 import org.apache.lucene.document.NumericDocValuesField;
 import org.apache.lucene.index.BinaryDocValues;
 import org.apache.lucene.index.CorruptIndexException;
@@ -95,7 +94,6 @@ import org.elasticsearch.common.util.iterable.Iterables;
 import org.elasticsearch.index.analysis.AnalyzerScope;
 import org.elasticsearch.index.analysis.NamedAnalyzer;
 import org.elasticsearch.index.fielddata.IndexFieldData;
-import org.elasticsearch.index.mapper.SeqNoFieldMapper;
 
 import java.io.IOException;
 import java.text.ParseException;
@@ -105,7 +103,6 @@ import java.util.Collection;
 import java.util.Collections;
 import java.util.List;
 import java.util.Map;
-import java.util.function.LongConsumer;
 
 public class Lucene {
     public static final String LATEST_DOC_VALUES_FORMAT = "Lucene70";
@@ -1050,39 +1047,4 @@ public class Lucene {
             }
         };
     }
-
-    /**
-     * Scans sequence numbers (i.e., {@link SeqNoFieldMapper#NAME}) between {@code fromSeqNo}(inclusive) and {@code toSeqNo}(inclusive)
-     * in the provided directory reader. This method invokes the callback {@code onNewSeqNo} whenever a sequence number value is found.
-     *
-     * @param directoryReader the directory reader to scan
-     * @param fromSeqNo       the lower bound of a range of seq_no to scan (inclusive)
-     * @param toSeqNo         the upper bound of a range of seq_no to scan (inclusive)
-     * @param onNewSeqNo      the callback to be called whenever a new valid sequence number is found
-     */
-    public static void scanSeqNosInReader(DirectoryReader directoryReader, long fromSeqNo, long toSeqNo,
-                                          LongConsumer onNewSeqNo) throws IOException {
-        final DirectoryReader reader = Lucene.wrapAllDocsLive(directoryReader);
-        final IndexSearcher searcher = new IndexSearcher(reader);
-        searcher.setQueryCache(null);
-        final Query query = LongPoint.newRangeQuery(SeqNoFieldMapper.NAME, fromSeqNo, toSeqNo);
-        final Weight weight = searcher.createWeight(query, ScoreMode.COMPLETE_NO_SCORES, 1.0f);
-        for (LeafReaderContext leaf : reader.leaves()) {
-            final Scorer scorer = weight.scorer(leaf);
-            if (scorer == null) {
-                continue;
-            }
-            final DocIdSetIterator docIdSetIterator = scorer.iterator();
-            final NumericDocValues seqNoDocValues = leaf.reader().getNumericDocValues(SeqNoFieldMapper.NAME);
-            int docId;
-            while ((docId = docIdSetIterator.nextDoc()) != DocIdSetIterator.NO_MORE_DOCS) {
-                if (seqNoDocValues == null || seqNoDocValues.advanceExact(docId) == false) {
-                    throw new IllegalStateException("seq_no doc_values not found for doc_id=" + docId);
-                }
-                final long seqNo = seqNoDocValues.longValue();
-                assert fromSeqNo <= seqNo && seqNo <= toSeqNo : "from_seq_no=" + fromSeqNo + " seq_no=" + seqNo + " to_seq_no=" + toSeqNo;
-                onNewSeqNo.accept(seqNo);
-            }
-        }
-    }
 }

+ 90 - 0
server/src/main/java/org/elasticsearch/index/engine/CombinedDocValues.java

@@ -0,0 +1,90 @@
+/*
+ * Licensed to Elasticsearch under one or more contributor
+ * license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright
+ * ownership. Elasticsearch licenses this file to you under
+ * the Apache License, Version 2.0 (the "License"); you may
+ * not use this file except in compliance with the License.
+ * You may obtain a copy of the License at
+ *
+ *    http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied.  See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+
+package org.elasticsearch.index.engine;
+
+import org.apache.lucene.index.LeafReader;
+import org.apache.lucene.index.NumericDocValues;
+import org.elasticsearch.index.mapper.SeqNoFieldMapper;
+import org.elasticsearch.index.mapper.SourceFieldMapper;
+import org.elasticsearch.index.mapper.VersionFieldMapper;
+
+import java.io.IOException;
+import java.util.Objects;
+
+final class CombinedDocValues {
+    private final NumericDocValues versionDV;
+    private final NumericDocValues seqNoDV;
+    private final NumericDocValues primaryTermDV;
+    private final NumericDocValues tombstoneDV;
+    private final NumericDocValues recoverySource;
+
+    CombinedDocValues(LeafReader leafReader) throws IOException {
+        this.versionDV = Objects.requireNonNull(leafReader.getNumericDocValues(VersionFieldMapper.NAME), "VersionDV is missing");
+        this.seqNoDV = Objects.requireNonNull(leafReader.getNumericDocValues(SeqNoFieldMapper.NAME), "SeqNoDV is missing");
+        this.primaryTermDV = Objects.requireNonNull(
+            leafReader.getNumericDocValues(SeqNoFieldMapper.PRIMARY_TERM_NAME), "PrimaryTermDV is missing");
+        this.tombstoneDV = leafReader.getNumericDocValues(SeqNoFieldMapper.TOMBSTONE_NAME);
+        this.recoverySource = leafReader.getNumericDocValues(SourceFieldMapper.RECOVERY_SOURCE_NAME);
+    }
+
+    long docVersion(int segmentDocId) throws IOException {
+        assert versionDV.docID() < segmentDocId;
+        if (versionDV.advanceExact(segmentDocId) == false) {
+            throw new IllegalStateException("DocValues for field [" + VersionFieldMapper.NAME + "] is not found");
+        }
+        return versionDV.longValue();
+    }
+
+    long docSeqNo(int segmentDocId) throws IOException {
+        assert seqNoDV.docID() < segmentDocId;
+        if (seqNoDV.advanceExact(segmentDocId) == false) {
+            throw new IllegalStateException("DocValues for field [" + SeqNoFieldMapper.NAME + "] is not found");
+        }
+        return seqNoDV.longValue();
+    }
+
+    long docPrimaryTerm(int segmentDocId) throws IOException {
+        if (primaryTermDV == null) {
+            return -1L;
+        }
+        assert primaryTermDV.docID() < segmentDocId;
+        // Use -1 for docs which don't have primary term. The caller considers those docs as nested docs.
+        if (primaryTermDV.advanceExact(segmentDocId) == false) {
+            return -1;
+        }
+        return primaryTermDV.longValue();
+    }
+
+    boolean isTombstone(int segmentDocId) throws IOException {
+        if (tombstoneDV == null) {
+            return false;
+        }
+        assert tombstoneDV.docID() < segmentDocId;
+        return tombstoneDV.advanceExact(segmentDocId) && tombstoneDV.longValue() > 0;
+    }
+
+    boolean hasRecoverySource(int segmentDocId) throws IOException {
+        if (recoverySource == null) {
+            return false;
+        }
+        assert recoverySource.docID() < segmentDocId;
+        return recoverySource.advanceExact(segmentDocId);
+    }
+}

+ 99 - 43
server/src/main/java/org/elasticsearch/index/engine/InternalEngine.java

@@ -23,6 +23,7 @@ import org.apache.logging.log4j.Logger;
 import org.apache.lucene.codecs.blocktree.BlockTreeTermsReader;
 import org.apache.lucene.codecs.blocktree.BlockTreeTermsReader.FSTLoadMode;
 import org.apache.lucene.document.Field;
+import org.apache.lucene.document.LongPoint;
 import org.apache.lucene.document.NumericDocValuesField;
 import org.apache.lucene.index.DirectoryReader;
 import org.apache.lucene.index.IndexCommit;
@@ -31,17 +32,23 @@ import org.apache.lucene.index.IndexWriter;
 import org.apache.lucene.index.IndexWriterConfig;
 import org.apache.lucene.index.IndexableField;
 import org.apache.lucene.index.LeafReader;
+import org.apache.lucene.index.LeafReaderContext;
 import org.apache.lucene.index.LiveIndexWriterConfig;
 import org.apache.lucene.index.MergePolicy;
 import org.apache.lucene.index.SegmentCommitInfo;
 import org.apache.lucene.index.SegmentInfos;
 import org.apache.lucene.index.SoftDeletesRetentionMergePolicy;
 import org.apache.lucene.index.Term;
+import org.apache.lucene.search.DocIdSetIterator;
 import org.apache.lucene.search.IndexSearcher;
+import org.apache.lucene.search.Query;
 import org.apache.lucene.search.ReferenceManager;
+import org.apache.lucene.search.ScoreMode;
+import org.apache.lucene.search.Scorer;
 import org.apache.lucene.search.SearcherFactory;
 import org.apache.lucene.search.SearcherManager;
 import org.apache.lucene.search.TermQuery;
+import org.apache.lucene.search.Weight;
 import org.apache.lucene.store.AlreadyClosedException;
 import org.apache.lucene.store.Directory;
 import org.apache.lucene.store.FilterDirectory;
@@ -68,12 +75,14 @@ import org.elasticsearch.common.util.concurrent.ReleasableLock;
 import org.elasticsearch.core.internal.io.IOUtils;
 import org.elasticsearch.index.IndexSettings;
 import org.elasticsearch.index.VersionType;
+import org.elasticsearch.index.fieldvisitor.IdOnlyFieldVisitor;
 import org.elasticsearch.index.mapper.IdFieldMapper;
 import org.elasticsearch.index.mapper.MapperService;
 import org.elasticsearch.index.mapper.ParseContext;
 import org.elasticsearch.index.mapper.ParsedDocument;
 import org.elasticsearch.index.mapper.SeqNoFieldMapper;
 import org.elasticsearch.index.mapper.SourceFieldMapper;
+import org.elasticsearch.index.mapper.Uid;
 import org.elasticsearch.index.merge.MergeStats;
 import org.elasticsearch.index.merge.OnGoingMerge;
 import org.elasticsearch.index.seqno.LocalCheckpointTracker;
@@ -94,7 +103,6 @@ import java.io.Closeable;
 import java.io.IOException;
 import java.nio.file.Path;
 import java.util.Arrays;
-import java.util.Collection;
 import java.util.HashMap;
 import java.util.List;
 import java.util.Map;
@@ -109,7 +117,7 @@ import java.util.concurrent.locks.Lock;
 import java.util.concurrent.locks.ReentrantLock;
 import java.util.function.BiFunction;
 import java.util.function.LongSupplier;
-import java.util.function.Supplier;
+import java.util.stream.Collectors;
 import java.util.stream.Stream;
 
 public class InternalEngine extends Engine {
@@ -203,6 +211,7 @@ public class InternalEngine extends Engine {
                 this.softDeletesPolicy = newSoftDeletesPolicy();
                 this.combinedDeletionPolicy =
                     new CombinedDeletionPolicy(logger, translogDeletionPolicy, softDeletesPolicy, translog::getLastSyncedGlobalCheckpoint);
+                this.localCheckpointTracker = createLocalCheckpointTracker(localCheckpointTrackerSupplier);
                 writer = createWriter();
                 bootstrapAppendOnlyInfoFromWriter(writer);
                 historyUUID = loadHistoryUUID(writer);
@@ -232,11 +241,17 @@ public class InternalEngine extends Engine {
             for (ReferenceManager.RefreshListener listener: engineConfig.getInternalRefreshListener()) {
                 this.internalSearcherManager.addListener(listener);
             }
-            this.localCheckpointTracker = createLocalCheckpointTracker(engineConfig, lastCommittedSegmentInfos, logger,
-                () -> acquireSearcher("create_local_checkpoint_tracker", SearcherScope.INTERNAL), localCheckpointTrackerSupplier);
             this.lastRefreshedCheckpointListener = new LastRefreshedCheckpointListener(localCheckpointTracker.getCheckpoint());
             this.internalSearcherManager.addListener(lastRefreshedCheckpointListener);
             maxSeqNoOfUpdatesOrDeletes = new AtomicLong(SequenceNumbers.max(localCheckpointTracker.getMaxSeqNo(), translog.getMaxSeqNo()));
+            if (softDeleteEnabled && localCheckpointTracker.getCheckpoint() < localCheckpointTracker.getMaxSeqNo()) {
+                try (Searcher searcher = acquireSearcher("restore_version_map_and_checkpoint_tracker", SearcherScope.INTERNAL)) {
+                    restoreVersionMapAndCheckpointTracker(Lucene.wrapAllDocsLive(searcher.getDirectoryReader()));
+                } catch (IOException e) {
+                    throw new EngineCreationFailureException(config().getShardId(),
+                        "failed to restore version map and local checkpoint tracker", e);
+                }
+            }
             success = true;
         } finally {
             if (success == false) {
@@ -250,30 +265,16 @@ public class InternalEngine extends Engine {
         logger.trace("created new InternalEngine");
     }
 
-    private static LocalCheckpointTracker createLocalCheckpointTracker(EngineConfig engineConfig, SegmentInfos lastCommittedSegmentInfos,
-        Logger logger, Supplier<Searcher> searcherSupplier, BiFunction<Long, Long, LocalCheckpointTracker> localCheckpointTrackerSupplier) {
-        try {
-            final SequenceNumbers.CommitInfo seqNoStats =
-                SequenceNumbers.loadSeqNoInfoFromLuceneCommit(lastCommittedSegmentInfos.userData.entrySet());
-            final long maxSeqNo = seqNoStats.maxSeqNo;
-            final long localCheckpoint = seqNoStats.localCheckpoint;
-            logger.trace("recovered maximum sequence number [{}] and local checkpoint [{}]", maxSeqNo, localCheckpoint);
-            final LocalCheckpointTracker tracker = localCheckpointTrackerSupplier.apply(maxSeqNo, localCheckpoint);
-            // Operations that are optimized using max_seq_no_of_updates optimization must not be processed twice; otherwise, they will
-            // create duplicates in Lucene. To avoid this we check the LocalCheckpointTracker to see if an operation was already processed.
-            // Thus, we need to restore the LocalCheckpointTracker bit by bit to ensure the consistency between LocalCheckpointTracker and
-            // Lucene index. This is not the only solution since we can bootstrap max_seq_no_of_updates with max_seq_no of the commit to
-            // disable the MSU optimization during recovery. Here we prefer to maintain the consistency of LocalCheckpointTracker.
-            if (localCheckpoint < maxSeqNo && engineConfig.getIndexSettings().isSoftDeleteEnabled()) {
-                try (Searcher searcher = searcherSupplier.get()) {
-                    Lucene.scanSeqNosInReader(searcher.getDirectoryReader(), localCheckpoint + 1, maxSeqNo,
-                        tracker::markSeqNoAsCompleted);
-                }
-            }
-            return tracker;
-        } catch (IOException ex) {
-            throw new EngineCreationFailureException(engineConfig.getShardId(), "failed to create local checkpoint tracker", ex);
-        }
+    private LocalCheckpointTracker createLocalCheckpointTracker(
+        BiFunction<Long, Long, LocalCheckpointTracker> localCheckpointTrackerSupplier) throws IOException {
+        final long maxSeqNo;
+        final long localCheckpoint;
+        final SequenceNumbers.CommitInfo seqNoStats =
+            SequenceNumbers.loadSeqNoInfoFromLuceneCommit(store.readLastCommittedSegmentsInfo().userData.entrySet());
+        maxSeqNo = seqNoStats.maxSeqNo;
+        localCheckpoint = seqNoStats.localCheckpoint;
+        logger.trace("recovered maximum sequence number [{}] and local checkpoint [{}]", maxSeqNo, localCheckpoint);
+        return localCheckpointTrackerSupplier.apply(maxSeqNo, localCheckpoint);
     }
 
     private SoftDeletesPolicy newSoftDeletesPolicy() throws IOException {
@@ -676,21 +677,26 @@ public class InternalEngine extends Engine {
         LUCENE_DOC_NOT_FOUND
     }
 
+    private static OpVsLuceneDocStatus compareOpToVersionMapOnSeqNo(String id, long seqNo, long primaryTerm, VersionValue versionValue) {
+        Objects.requireNonNull(versionValue);
+        if (seqNo > versionValue.seqNo) {
+            return OpVsLuceneDocStatus.OP_NEWER;
+        } else if (seqNo == versionValue.seqNo) {
+            assert versionValue.term == primaryTerm : "primary term not matched; id=" + id + " seq_no=" + seqNo
+                + " op_term=" + primaryTerm + " existing_term=" + versionValue.term;
+            return OpVsLuceneDocStatus.OP_STALE_OR_EQUAL;
+        } else {
+            return OpVsLuceneDocStatus.OP_STALE_OR_EQUAL;
+        }
+    }
+
     private OpVsLuceneDocStatus compareOpToLuceneDocBasedOnSeqNo(final Operation op) throws IOException {
         assert op.seqNo() != SequenceNumbers.UNASSIGNED_SEQ_NO : "resolving ops based on seq# but no seqNo is found";
         final OpVsLuceneDocStatus status;
         VersionValue versionValue = getVersionFromMap(op.uid().bytes());
         assert incrementVersionLookup();
         if (versionValue != null) {
-            if (op.seqNo() > versionValue.seqNo) {
-                status = OpVsLuceneDocStatus.OP_NEWER;
-            } else if (op.seqNo() == versionValue.seqNo) {
-                assert versionValue.term == op.primaryTerm() : "primary term not matched; id=" + op.id() + " seq_no=" + op.seqNo()
-                    + " op_term=" + op.primaryTerm() + " existing_term=" + versionValue.term;
-                status = OpVsLuceneDocStatus.OP_STALE_OR_EQUAL;
-            } else {
-                status = OpVsLuceneDocStatus.OP_STALE_OR_EQUAL;
-            }
+            status = compareOpToVersionMapOnSeqNo(op.id(), op.seqNo(), op.primaryTerm(), versionValue);
         } else {
             // load from index
             assert incrementIndexVersionLookup();
@@ -1875,8 +1881,9 @@ public class InternalEngine extends Engine {
     }
 
     // for testing
-    final Collection<DeleteVersionValue> getDeletedTombstones() {
-        return versionMap.getAllTombstones().values();
+    final Map<BytesRef, VersionValue> getVersionMap() {
+        return Stream.concat(versionMap.getAllCurrent().entrySet().stream(), versionMap.getAllTombstones().entrySet().stream())
+            .collect(Collectors.toMap(Map.Entry::getKey, Map.Entry::getValue));
     }
 
     @Override
@@ -2507,10 +2514,6 @@ public class InternalEngine extends Engine {
         return true;
     }
 
-    int getVersionMapSize() {
-        return versionMap.getAllCurrent().size();
-    }
-
     boolean isSafeAccessRequired() {
         return versionMap.isSafeAccessRequired();
     }
@@ -2773,4 +2776,57 @@ public class InternalEngine extends Engine {
         final long minRetainedTranslogGen = Translog.readMinTranslogGeneration(translogPath, translogUUID);
         store.trimUnsafeCommits(globalCheckpoint, minRetainedTranslogGen, engineConfig.getIndexSettings().getIndexVersionCreated());
     }
+
+    /**
+     * Restores the live version map and local checkpoint of this engine using documents (including soft-deleted)
+     * after the local checkpoint in the safe commit. This step ensures the live version map and checkpoint tracker
+     * are in sync with the Lucene commit.
+     */
+    private void restoreVersionMapAndCheckpointTracker(DirectoryReader directoryReader) throws IOException {
+        final IndexSearcher searcher = new IndexSearcher(directoryReader);
+        searcher.setQueryCache(null);
+        final Query query = LongPoint.newRangeQuery(SeqNoFieldMapper.NAME, getLocalCheckpoint() + 1, Long.MAX_VALUE);
+        final Weight weight = searcher.createWeight(query, ScoreMode.COMPLETE_NO_SCORES, 1.0f);
+        for (LeafReaderContext leaf : directoryReader.leaves()) {
+            final Scorer scorer = weight.scorer(leaf);
+            if (scorer == null) {
+                continue;
+            }
+            final CombinedDocValues dv = new CombinedDocValues(leaf.reader());
+            final IdOnlyFieldVisitor idFieldVisitor = new IdOnlyFieldVisitor();
+            final DocIdSetIterator iterator = scorer.iterator();
+            int docId;
+            while ((docId = iterator.nextDoc()) != DocIdSetIterator.NO_MORE_DOCS) {
+                final long primaryTerm = dv.docPrimaryTerm(docId);
+                if (primaryTerm == -1L) {
+                    continue; // skip children docs which do not have primary term
+                }
+                final long seqNo = dv.docSeqNo(docId);
+                localCheckpointTracker.markSeqNoAsCompleted(seqNo);
+                idFieldVisitor.reset();
+                leaf.reader().document(docId, idFieldVisitor);
+                if (idFieldVisitor.getId() == null) {
+                    assert dv.isTombstone(docId);
+                    continue;
+                }
+                final BytesRef uid = new Term(IdFieldMapper.NAME, Uid.encodeId(idFieldVisitor.getId())).bytes();
+                try (Releasable ignored = versionMap.acquireLock(uid)) {
+                    final VersionValue curr = versionMap.getUnderLock(uid);
+                    if (curr == null ||
+                        compareOpToVersionMapOnSeqNo(idFieldVisitor.getId(), seqNo, primaryTerm, curr) == OpVsLuceneDocStatus.OP_NEWER) {
+                        if (dv.isTombstone(docId)) {
+                            // use 0L for the start time so we can prune this delete tombstone quickly
+                            // when the local checkpoint advances (i.e., after a recovery completed).
+                            final long startTime = 0L;
+                            versionMap.putDeleteUnderLock(uid, new DeleteVersionValue(dv.docVersion(docId), seqNo, primaryTerm, startTime));
+                        } else {
+                            versionMap.putIndexUnderLock(uid, new IndexVersionValue(null, dv.docVersion(docId), seqNo, primaryTerm));
+                        }
+                    }
+                }
+            }
+        }
+        // remove live entries in the version map
+        refresh("restore_version_map_and_checkpoint_tracker", SearcherScope.INTERNAL, true);
+    }
 }

+ 0 - 62
server/src/main/java/org/elasticsearch/index/engine/LuceneChangesSnapshot.java

@@ -40,14 +40,12 @@ import org.elasticsearch.index.mapper.MapperService;
 import org.elasticsearch.index.mapper.SeqNoFieldMapper;
 import org.elasticsearch.index.mapper.SourceFieldMapper;
 import org.elasticsearch.index.mapper.Uid;
-import org.elasticsearch.index.mapper.VersionFieldMapper;
 import org.elasticsearch.index.translog.Translog;
 
 import java.io.Closeable;
 import java.io.IOException;
 import java.util.Comparator;
 import java.util.List;
-import java.util.Objects;
 import java.util.concurrent.atomic.AtomicBoolean;
 
 /**
@@ -305,64 +303,4 @@ final class LuceneChangesSnapshot implements Translog.Snapshot {
         }
     }
 
-    private static final class CombinedDocValues {
-        private final NumericDocValues versionDV;
-        private final NumericDocValues seqNoDV;
-        private final NumericDocValues primaryTermDV;
-        private final NumericDocValues tombstoneDV;
-        private final NumericDocValues recoverySource;
-
-        CombinedDocValues(LeafReader leafReader) throws IOException {
-            this.versionDV = Objects.requireNonNull(leafReader.getNumericDocValues(VersionFieldMapper.NAME), "VersionDV is missing");
-            this.seqNoDV = Objects.requireNonNull(leafReader.getNumericDocValues(SeqNoFieldMapper.NAME), "SeqNoDV is missing");
-            this.primaryTermDV = Objects.requireNonNull(
-                leafReader.getNumericDocValues(SeqNoFieldMapper.PRIMARY_TERM_NAME), "PrimaryTermDV is missing");
-            this.tombstoneDV = leafReader.getNumericDocValues(SeqNoFieldMapper.TOMBSTONE_NAME);
-            this.recoverySource = leafReader.getNumericDocValues(SourceFieldMapper.RECOVERY_SOURCE_NAME);
-        }
-
-        long docVersion(int segmentDocId) throws IOException {
-            assert versionDV.docID() < segmentDocId;
-            if (versionDV.advanceExact(segmentDocId) == false) {
-                throw new IllegalStateException("DocValues for field [" + VersionFieldMapper.NAME + "] is not found");
-            }
-            return versionDV.longValue();
-        }
-
-        long docSeqNo(int segmentDocId) throws IOException {
-            assert seqNoDV.docID() < segmentDocId;
-            if (seqNoDV.advanceExact(segmentDocId) == false) {
-                throw new IllegalStateException("DocValues for field [" + SeqNoFieldMapper.NAME + "] is not found");
-            }
-            return seqNoDV.longValue();
-        }
-
-        long docPrimaryTerm(int segmentDocId) throws IOException {
-            if (primaryTermDV == null) {
-                return -1L;
-            }
-            assert primaryTermDV.docID() < segmentDocId;
-            // Use -1 for docs which don't have primary term. The caller considers those docs as nested docs.
-            if (primaryTermDV.advanceExact(segmentDocId) == false) {
-                return -1;
-            }
-            return primaryTermDV.longValue();
-        }
-
-        boolean isTombstone(int segmentDocId) throws IOException {
-            if (tombstoneDV == null) {
-                return false;
-            }
-            assert tombstoneDV.docID() < segmentDocId;
-            return tombstoneDV.advanceExact(segmentDocId) && tombstoneDV.longValue() > 0;
-        }
-
-        boolean hasRecoverySource(int segmentDocId) throws IOException {
-            if (recoverySource == null) {
-                return false;
-            }
-            assert recoverySource.docID() < segmentDocId;
-            return recoverySource.advanceExact(segmentDocId);
-        }
-    }
 }

+ 60 - 0
server/src/main/java/org/elasticsearch/index/fieldvisitor/IdOnlyFieldVisitor.java

@@ -0,0 +1,60 @@
+/*
+ * Licensed to Elasticsearch under one or more contributor
+ * license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright
+ * ownership. Elasticsearch licenses this file to you under
+ * the Apache License, Version 2.0 (the "License"); you may
+ * not use this file except in compliance with the License.
+ * You may obtain a copy of the License at
+ *
+ *    http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied.  See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+
+package org.elasticsearch.index.fieldvisitor;
+
+import org.apache.lucene.index.FieldInfo;
+import org.apache.lucene.index.StoredFieldVisitor;
+import org.elasticsearch.index.mapper.IdFieldMapper;
+import org.elasticsearch.index.mapper.Uid;
+
+public final class IdOnlyFieldVisitor extends StoredFieldVisitor {
+    private String id = null;
+    private boolean visited = false;
+
+    @Override
+    public Status needsField(FieldInfo fieldInfo) {
+        if (visited) {
+            return Status.STOP;
+        }
+        if (IdFieldMapper.NAME.equals(fieldInfo.name)) {
+            visited = true;
+            return Status.YES;
+        } else {
+            return Status.NO;
+        }
+    }
+
+    @Override
+    public void binaryField(FieldInfo fieldInfo, byte[] value) {
+        assert IdFieldMapper.NAME.equals(fieldInfo.name) : fieldInfo;
+        if (IdFieldMapper.NAME.equals(fieldInfo.name)) {
+            id = Uid.decodeId(value);
+        }
+    }
+
+    public String getId() {
+        return id;
+    }
+
+    public void reset() {
+        id = null;
+        visited = false;
+    }
+}

+ 79 - 16
server/src/test/java/org/elasticsearch/index/engine/InternalEngineTests.java

@@ -223,7 +223,7 @@ public class InternalEngineTests extends EngineTestCase {
         Engine.Index update = indexForDoc(doc);
         engine.index(update);
         assertTrue(engine.isSafeAccessRequired());
-        assertEquals(1, engine.getVersionMapSize());
+        assertThat(engine.getVersionMap().values(), hasSize(1));
         try (Engine.Searcher searcher = engine.acquireSearcher("test")) {
             assertEquals(0, searcher.reader().numDocs());
         }
@@ -255,7 +255,7 @@ public class InternalEngineTests extends EngineTestCase {
             : appendOnlyReplica(doc, false, 1, generateNewSeqNo(engine));
         engine.index(operation);
         assertTrue("safe access should be required", engine.isSafeAccessRequired());
-        assertEquals(1, engine.getVersionMapSize()); // now we add this to the map
+        assertThat(engine.getVersionMap().values(), hasSize(1)); // now we add this to the map
         engine.refresh("test");
         if (randomBoolean()) { // randomly refresh here again
             engine.refresh("test");
@@ -3869,7 +3869,7 @@ public class InternalEngineTests extends EngineTestCase {
                     } catch (InterruptedException e) {
                         throw new AssertionError(e);
                     }
-                    assertEquals(0, engine.getVersionMapSize());
+                    assertThat(engine.getVersionMap().values(), empty());
                     int docOffset;
                     while ((docOffset = offset.incrementAndGet()) < docs.size()) {
                         try {
@@ -5168,18 +5168,19 @@ public class InternalEngineTests extends EngineTestCase {
                     engine.delete(replicaDeleteForDoc(UUIDs.randomBase64UUID(), 1, seqno, threadPool.relativeTimeInMillis()));
                 }
             }
-            List<DeleteVersionValue> tombstones = new ArrayList<>(engine.getDeletedTombstones());
+
+            List<DeleteVersionValue> tombstones = new ArrayList<>(tombstonesInVersionMap(engine).values());
             engine.config().setEnableGcDeletes(true);
             // Prune tombstones whose seqno < gap_seqno and timestamp < clock-gcInterval.
             clock.set(randomLongBetween(gcInterval, deleteBatch + gcInterval));
             engine.refresh("test");
             tombstones.removeIf(v -> v.seqNo < gapSeqNo && v.time < clock.get() - gcInterval);
-            assertThat(engine.getDeletedTombstones(), containsInAnyOrder(tombstones.toArray()));
+            assertThat(tombstonesInVersionMap(engine).values(), containsInAnyOrder(tombstones.toArray()));
             // Prune tombstones whose seqno at most the local checkpoint (eg. seqno < gap_seqno).
             clock.set(randomLongBetween(deleteBatch + gcInterval * 4/3, 100)); // Need a margin for gcInterval/4.
             engine.refresh("test");
             tombstones.removeIf(v -> v.seqNo < gapSeqNo);
-            assertThat(engine.getDeletedTombstones(), containsInAnyOrder(tombstones.toArray()));
+            assertThat(tombstonesInVersionMap(engine).values(), containsInAnyOrder(tombstones.toArray()));
             // Fill the seqno gap - should prune all tombstones.
             clock.set(between(0, 100));
             if (randomBoolean()) {
@@ -5191,7 +5192,7 @@ public class InternalEngineTests extends EngineTestCase {
             }
             clock.set(randomLongBetween(100 + gcInterval * 4/3, Long.MAX_VALUE)); // Need a margin for gcInterval/4.
             engine.refresh("test");
-            assertThat(engine.getDeletedTombstones(), empty());
+            assertThat(tombstonesInVersionMap(engine).values(), empty());
         }
     }
 
@@ -5605,9 +5606,10 @@ public class InternalEngineTests extends EngineTestCase {
         }
     }
 
-    public void testRebuildLocalCheckpointTracker() throws Exception {
+    public void testRebuildLocalCheckpointTrackerAndVersionMap() throws Exception {
         Settings.Builder settings = Settings.builder()
             .put(defaultSettings.getSettings())
+            .put(IndexSettings.INDEX_SOFT_DELETES_RETENTION_OPERATIONS_SETTING.getKey(), 10000)
             .put(IndexSettings.INDEX_SOFT_DELETES_SETTING.getKey(), true);
         final IndexMetaData indexMetaData = IndexMetaData.builder(defaultSettings.getIndexMetaData()).settings(settings).build();
         final IndexSettings indexSettings = IndexSettingsModule.newIndexSettings(indexMetaData);
@@ -5624,32 +5626,54 @@ public class InternalEngineTests extends EngineTestCase {
                 for (Engine.Operation op : operations) {
                     flushedOperations.add(op);
                     applyOperation(engine, op);
+                    if (randomBoolean()) {
+                        globalCheckpoint.set(randomLongBetween(globalCheckpoint.get(), engine.getLocalCheckpoint()));
+                        engine.syncTranslog();
+                    }
                     if (randomInt(100) < 10) {
                         engine.refresh("test");
                     }
                     if (randomInt(100) < 5) {
-                        engine.flush();
+                        engine.flush(true, true);
+                        flushedOperations.sort(Comparator.comparing(Engine.Operation::seqNo));
                         commits.add(new ArrayList<>(flushedOperations));
-                        globalCheckpoint.set(randomLongBetween(globalCheckpoint.get(), engine.getLocalCheckpoint()));
                     }
                 }
-                globalCheckpoint.set(randomLongBetween(globalCheckpoint.get(), engine.getLocalCheckpoint()));
-                engine.syncTranslog();
                 docs = getDocIds(engine, true);
             }
-            Set<Long> seqNosInSafeCommit = null;
+            List<Engine.Operation> operationsInSafeCommit = null;
             for (int i = commits.size() - 1; i >= 0; i--) {
                 if (commits.get(i).stream().allMatch(op -> op.seqNo() <= globalCheckpoint.get())) {
-                    seqNosInSafeCommit = commits.get(i).stream().map(Engine.Operation::seqNo).collect(Collectors.toSet());
+                    operationsInSafeCommit = commits.get(i);
                     break;
                 }
             }
-            assertThat(seqNosInSafeCommit, notNullValue());
+            assertThat(operationsInSafeCommit, notNullValue());
             try (InternalEngine engine = new InternalEngine(config)) { // do not recover from translog
+                final Map<BytesRef, Engine.Operation> deletesAfterCheckpoint = new HashMap<>();
+                for (Engine.Operation op : operationsInSafeCommit) {
+                    if (op instanceof Engine.NoOp == false && op.seqNo() > engine.getLocalCheckpoint()) {
+                        deletesAfterCheckpoint.put(new Term(IdFieldMapper.NAME, Uid.encodeId(op.id())).bytes(), op);
+                    }
+                }
+                deletesAfterCheckpoint.values().removeIf(o -> o instanceof Engine.Delete == false);
+                final Map<BytesRef, VersionValue> versionMap = engine.getVersionMap();
+                for (BytesRef uid : deletesAfterCheckpoint.keySet()) {
+                    final VersionValue versionValue = versionMap.get(uid);
+                    final Engine.Operation op = deletesAfterCheckpoint.get(uid);
+                    final String msg = versionValue + " vs " +
+                        "op[" + op.operationType() + "id=" + op.id() + " seqno=" + op.seqNo() + " term=" + op.primaryTerm() + "]";
+                    assertThat(versionValue, instanceOf(DeleteVersionValue.class));
+                    assertThat(msg, versionValue.seqNo, equalTo(op.seqNo()));
+                    assertThat(msg, versionValue.term, equalTo(op.primaryTerm()));
+                    assertThat(msg, versionValue.version, equalTo(op.version()));
+                }
+                assertThat(versionMap.keySet(), equalTo(deletesAfterCheckpoint.keySet()));
                 final LocalCheckpointTracker tracker = engine.getLocalCheckpointTracker();
+                final Set<Long> seqNosInSafeCommit = operationsInSafeCommit.stream().map(op -> op.seqNo()).collect(Collectors.toSet());
                 for (Engine.Operation op : operations) {
                     assertThat(
-                        "seq_no=" + op.seqNo() + " max_seq_no=" + tracker.getMaxSeqNo() + "checkpoint=" + tracker.getCheckpoint(),
+                        "seq_no=" + op.seqNo() + " max_seq_no=" + tracker.getMaxSeqNo() + " checkpoint=" + tracker.getCheckpoint(),
                         tracker.contains(op.seqNo()), equalTo(seqNosInSafeCommit.contains(op.seqNo())));
                 }
                 engine.recoverFromTranslog(translogHandler, Long.MAX_VALUE);
@@ -5918,4 +5942,43 @@ public class InternalEngineTests extends EngineTestCase {
             }
         }
     }
+
+    public void testRecoverFromLocalTranslog() throws Exception {
+        final AtomicLong globalCheckpoint = new AtomicLong(SequenceNumbers.NO_OPS_PERFORMED);
+        Path translogPath = createTempDir();
+        List<Engine.Operation> operations = generateHistoryOnReplica(between(1, 500), randomBoolean(), randomBoolean(), randomBoolean());
+        try (Store store = createStore()) {
+            EngineConfig config = config(defaultSettings, store, translogPath, newMergePolicy(), null, null, globalCheckpoint::get);
+            final List<DocIdSeqNoAndSource> docs;
+            try (InternalEngine engine = createEngine(config)) {
+                for (Engine.Operation op : operations) {
+                    applyOperation(engine, op);
+                    if (randomBoolean()) {
+                        globalCheckpoint.set(randomLongBetween(globalCheckpoint.get(), engine.getLocalCheckpoint()));
+                        engine.syncTranslog();
+                    }
+                    if (randomInt(100) < 10) {
+                        engine.refresh("test");
+                    }
+                    if (randomInt(100) < 5) {
+                        engine.flush();
+                    }
+                    if (randomInt(100) < 5) {
+                        engine.forceMerge(randomBoolean(), 1, false, false, false);
+                    }
+                }
+                docs = getDocIds(engine, true);
+            }
+            try (InternalEngine engine = new InternalEngine(config)) {
+                engine.recoverFromTranslog(translogHandler, Long.MAX_VALUE);
+                assertThat(getDocIds(engine, randomBoolean()), equalTo(docs));
+            }
+        }
+    }
+
+    private Map<BytesRef, DeleteVersionValue> tombstonesInVersionMap(InternalEngine engine) {
+        return engine.getVersionMap().entrySet().stream()
+            .filter(e -> e.getValue() instanceof DeleteVersionValue)
+            .collect(Collectors.toMap(e -> e.getKey(), e -> (DeleteVersionValue) e.getValue()));
+    }
 }

+ 4 - 0
server/src/test/java/org/elasticsearch/index/replication/RecoveryDuringReplicationTests.java

@@ -26,6 +26,7 @@ import org.apache.lucene.store.AlreadyClosedException;
 import org.elasticsearch.Version;
 import org.elasticsearch.action.ActionListener;
 import org.elasticsearch.action.admin.indices.flush.FlushRequest;
+import org.elasticsearch.action.admin.indices.forcemerge.ForceMergeRequest;
 import org.elasticsearch.action.bulk.BulkShardRequest;
 import org.elasticsearch.action.delete.DeleteRequest;
 import org.elasticsearch.action.index.IndexRequest;
@@ -725,6 +726,9 @@ public class RecoveryDuringReplicationTests extends ESIndexLevelReplicationTestC
                             if (randomInt(100) < 10) {
                                 shards.getPrimary().flush(new FlushRequest());
                             }
+                            if (randomInt(100) < 5) {
+                                shards.getPrimary().forceMerge(new ForceMergeRequest().flush(randomBoolean()).maxNumSegments(1));
+                            }
                         } catch (Exception ex) {
                             throw new AssertionError(ex);
                         }

+ 13 - 3
test/framework/src/main/java/org/elasticsearch/index/engine/EngineTestCase.java

@@ -37,6 +37,7 @@ import org.apache.lucene.index.LiveIndexWriterConfig;
 import org.apache.lucene.index.MergePolicy;
 import org.apache.lucene.index.NumericDocValues;
 import org.apache.lucene.index.Term;
+import org.apache.lucene.search.DocIdSetIterator;
 import org.apache.lucene.search.IndexSearcher;
 import org.apache.lucene.search.MatchAllDocsQuery;
 import org.apache.lucene.search.ReferenceManager;
@@ -1111,10 +1112,8 @@ public abstract class EngineTestCase extends ESTestCase {
         List<IndexCommit> commits = DirectoryReader.listCommits(engine.store.directory());
         for (IndexCommit commit : commits) {
             try (DirectoryReader reader = DirectoryReader.open(commit)) {
-                AtomicLong maxSeqNoFromDocs = new AtomicLong(SequenceNumbers.NO_OPS_PERFORMED);
-                Lucene.scanSeqNosInReader(reader, 0, Long.MAX_VALUE, n -> maxSeqNoFromDocs.set(Math.max(n, maxSeqNoFromDocs.get())));
                 assertThat(Long.parseLong(commit.getUserData().get(SequenceNumbers.MAX_SEQ_NO)),
-                    greaterThanOrEqualTo(maxSeqNoFromDocs.get()));
+                    greaterThanOrEqualTo(maxSeqNosInReader(reader)));
             }
         }
     }
@@ -1177,4 +1176,15 @@ public abstract class EngineTestCase extends ESTestCase {
             return get();
         }
     }
+
+    static long maxSeqNosInReader(DirectoryReader reader) throws IOException {
+        long maxSeqNo = SequenceNumbers.NO_OPS_PERFORMED;
+        for (LeafReaderContext leaf : reader.leaves()) {
+            final NumericDocValues seqNoDocValues = leaf.reader().getNumericDocValues(SeqNoFieldMapper.NAME);
+            while (seqNoDocValues.nextDoc() != DocIdSetIterator.NO_MORE_DOCS) {
+                maxSeqNo = SequenceNumbers.max(maxSeqNo, seqNoDocValues.longValue());
+            }
+        }
+        return maxSeqNo;
+    }
 }

+ 3 - 0
x-pack/plugin/ccr/src/test/java/org/elasticsearch/xpack/ccr/FollowerFailOverIT.java

@@ -214,6 +214,9 @@ public class FollowerFailOverIT extends CcrIntegTestCase {
                     if (rarely()) {
                         followerClient().admin().indices().prepareFlush("follower-index").get();
                     }
+                    if (rarely()) {
+                        followerClient().admin().indices().prepareForceMerge("follower-index").setMaxNumSegments(1).get();
+                    }
                     if (rarely()) {
                         followerClient().admin().indices().prepareRefresh("follower-index").get();
                     }