|
@@ -151,32 +151,33 @@ public final class FollowingEngine extends InternalEngine {
|
|
|
}
|
|
|
|
|
|
private OptionalLong lookupPrimaryTerm(final long seqNo) throws IOException {
|
|
|
+ // Don't need to look up term for operations before the global checkpoint for they were processed on every copies already.
|
|
|
+ if (seqNo <= engineConfig.getGlobalCheckpointSupplier().getAsLong()) {
|
|
|
+ return OptionalLong.empty();
|
|
|
+ }
|
|
|
refreshIfNeeded("lookup_primary_term", seqNo);
|
|
|
try (Searcher engineSearcher = acquireSearcher("lookup_primary_term", SearcherScope.INTERNAL)) {
|
|
|
- // We have to acquire a searcher before execute this check to ensure that the requesting seq_no is always found in the else
|
|
|
- // branch. If the operation is at most the global checkpoint, we should not look up its term as we may have merged away the
|
|
|
- // operation. Moreover, we won't need to replicate this operation to replicas since it was processed on every copies already.
|
|
|
+ final DirectoryReader reader = Lucene.wrapAllDocsLive(engineSearcher.getDirectoryReader());
|
|
|
+ final IndexSearcher searcher = new IndexSearcher(reader);
|
|
|
+ searcher.setQueryCache(null);
|
|
|
+ final Query query = new BooleanQuery.Builder()
|
|
|
+ .add(LongPoint.newExactQuery(SeqNoFieldMapper.NAME, seqNo), BooleanClause.Occur.FILTER)
|
|
|
+ // excludes the non-root nested documents which don't have primary_term.
|
|
|
+ .add(new DocValuesFieldExistsQuery(SeqNoFieldMapper.PRIMARY_TERM_NAME), BooleanClause.Occur.FILTER)
|
|
|
+ .build();
|
|
|
+ final TopDocs topDocs = searcher.search(query, 1);
|
|
|
+ if (topDocs.scoreDocs.length == 1) {
|
|
|
+ final int docId = topDocs.scoreDocs[0].doc;
|
|
|
+ final LeafReaderContext leaf = reader.leaves().get(ReaderUtil.subIndex(docId, reader.leaves()));
|
|
|
+ final NumericDocValues primaryTermDV = leaf.reader().getNumericDocValues(SeqNoFieldMapper.PRIMARY_TERM_NAME);
|
|
|
+ if (primaryTermDV != null && primaryTermDV.advanceExact(docId - leaf.docBase)) {
|
|
|
+ assert primaryTermDV.longValue() > 0 : "invalid term [" + primaryTermDV.longValue() + "]";
|
|
|
+ return OptionalLong.of(primaryTermDV.longValue());
|
|
|
+ }
|
|
|
+ }
|
|
|
if (seqNo <= engineConfig.getGlobalCheckpointSupplier().getAsLong()) {
|
|
|
- return OptionalLong.empty();
|
|
|
+ return OptionalLong.empty(); // we have merged away the looking up operation.
|
|
|
} else {
|
|
|
- final DirectoryReader reader = Lucene.wrapAllDocsLive(engineSearcher.getDirectoryReader());
|
|
|
- final IndexSearcher searcher = new IndexSearcher(reader);
|
|
|
- searcher.setQueryCache(null);
|
|
|
- final Query query = new BooleanQuery.Builder()
|
|
|
- .add(LongPoint.newExactQuery(SeqNoFieldMapper.NAME, seqNo), BooleanClause.Occur.FILTER)
|
|
|
- // excludes the non-root nested documents which don't have primary_term.
|
|
|
- .add(new DocValuesFieldExistsQuery(SeqNoFieldMapper.PRIMARY_TERM_NAME), BooleanClause.Occur.FILTER)
|
|
|
- .build();
|
|
|
- final TopDocs topDocs = searcher.search(query, 1);
|
|
|
- if (topDocs.scoreDocs.length == 1) {
|
|
|
- final int docId = topDocs.scoreDocs[0].doc;
|
|
|
- final LeafReaderContext leaf = reader.leaves().get(ReaderUtil.subIndex(docId, reader.leaves()));
|
|
|
- final NumericDocValues primaryTermDV = leaf.reader().getNumericDocValues(SeqNoFieldMapper.PRIMARY_TERM_NAME);
|
|
|
- if (primaryTermDV != null && primaryTermDV.advanceExact(docId - leaf.docBase)) {
|
|
|
- assert primaryTermDV.longValue() > 0 : "invalid term [" + primaryTermDV.longValue() + "]";
|
|
|
- return OptionalLong.of(primaryTermDV.longValue());
|
|
|
- }
|
|
|
- }
|
|
|
assert false : "seq_no[" + seqNo + "] does not have primary_term, total_hits=[" + topDocs.totalHits + "]";
|
|
|
throw new IllegalStateException("seq_no[" + seqNo + "] does not have primary_term (total_hits=" + topDocs.totalHits + ")");
|
|
|
}
|