|
@@ -24,6 +24,7 @@ import org.apache.lucene.search.SortField;
|
|
import org.apache.lucene.search.TopDocs;
|
|
import org.apache.lucene.search.TopDocs;
|
|
import org.apache.lucene.search.TopFieldCollector;
|
|
import org.apache.lucene.search.TopFieldCollector;
|
|
import org.apache.lucene.util.ArrayUtil;
|
|
import org.apache.lucene.util.ArrayUtil;
|
|
|
|
+import org.elasticsearch.Version;
|
|
import org.elasticsearch.common.bytes.BytesReference;
|
|
import org.elasticsearch.common.bytes.BytesReference;
|
|
import org.elasticsearch.common.lucene.Lucene;
|
|
import org.elasticsearch.common.lucene.Lucene;
|
|
import org.elasticsearch.common.lucene.index.SequentialStoredFieldsLeafReader;
|
|
import org.elasticsearch.common.lucene.index.SequentialStoredFieldsLeafReader;
|
|
@@ -62,6 +63,8 @@ final class LuceneChangesSnapshot implements Translog.Snapshot {
|
|
private final ParallelArray parallelArray;
|
|
private final ParallelArray parallelArray;
|
|
private final Closeable onClose;
|
|
private final Closeable onClose;
|
|
|
|
|
|
|
|
+ private final Version indexVersionCreated;
|
|
|
|
+
|
|
private int storedFieldsReaderOrd = -1;
|
|
private int storedFieldsReaderOrd = -1;
|
|
private StoredFieldsReader storedFieldsReader = null;
|
|
private StoredFieldsReader storedFieldsReader = null;
|
|
|
|
|
|
@@ -77,6 +80,7 @@ final class LuceneChangesSnapshot implements Translog.Snapshot {
|
|
* @param requiredFullRange if true, the snapshot will strictly check for the existence of operations between fromSeqNo and toSeqNo
|
|
* @param requiredFullRange if true, the snapshot will strictly check for the existence of operations between fromSeqNo and toSeqNo
|
|
* @param singleConsumer true if the snapshot is accessed by a single thread that creates the snapshot
|
|
* @param singleConsumer true if the snapshot is accessed by a single thread that creates the snapshot
|
|
* @param accessStats true if the stats of the snapshot can be accessed via {@link #totalOperations()}
|
|
* @param accessStats true if the stats of the snapshot can be accessed via {@link #totalOperations()}
|
|
|
|
+ * @param indexVersionCreated the version on which this index was created
|
|
*/
|
|
*/
|
|
LuceneChangesSnapshot(
|
|
LuceneChangesSnapshot(
|
|
Engine.Searcher engineSearcher,
|
|
Engine.Searcher engineSearcher,
|
|
@@ -85,7 +89,8 @@ final class LuceneChangesSnapshot implements Translog.Snapshot {
|
|
long toSeqNo,
|
|
long toSeqNo,
|
|
boolean requiredFullRange,
|
|
boolean requiredFullRange,
|
|
boolean singleConsumer,
|
|
boolean singleConsumer,
|
|
- boolean accessStats
|
|
|
|
|
|
+ boolean accessStats,
|
|
|
|
+ Version indexVersionCreated
|
|
) throws IOException {
|
|
) throws IOException {
|
|
if (fromSeqNo < 0 || toSeqNo < 0 || fromSeqNo > toSeqNo) {
|
|
if (fromSeqNo < 0 || toSeqNo < 0 || fromSeqNo > toSeqNo) {
|
|
throw new IllegalArgumentException("Invalid range; from_seqno [" + fromSeqNo + "], to_seqno [" + toSeqNo + "]");
|
|
throw new IllegalArgumentException("Invalid range; from_seqno [" + fromSeqNo + "], to_seqno [" + toSeqNo + "]");
|
|
@@ -111,6 +116,7 @@ final class LuceneChangesSnapshot implements Translog.Snapshot {
|
|
this.indexSearcher.setQueryCache(null);
|
|
this.indexSearcher.setQueryCache(null);
|
|
this.accessStats = accessStats;
|
|
this.accessStats = accessStats;
|
|
this.parallelArray = new ParallelArray(this.searchBatchSize);
|
|
this.parallelArray = new ParallelArray(this.searchBatchSize);
|
|
|
|
+ this.indexVersionCreated = indexVersionCreated;
|
|
final TopDocs topDocs = searchOperations(null, accessStats);
|
|
final TopDocs topDocs = searchOperations(null, accessStats);
|
|
this.totalHits = Math.toIntExact(topDocs.totalHits.value);
|
|
this.totalHits = Math.toIntExact(topDocs.totalHits.value);
|
|
this.scoreDocs = topDocs.scoreDocs;
|
|
this.scoreDocs = topDocs.scoreDocs;
|
|
@@ -272,21 +278,22 @@ final class LuceneChangesSnapshot implements Translog.Snapshot {
|
|
return new IndexSearcher(Lucene.wrapAllDocsLive(engineSearcher.getDirectoryReader()));
|
|
return new IndexSearcher(Lucene.wrapAllDocsLive(engineSearcher.getDirectoryReader()));
|
|
}
|
|
}
|
|
|
|
|
|
- private static Query rangeQuery(long fromSeqNo, long toSeqNo) {
|
|
|
|
|
|
+ private static Query rangeQuery(long fromSeqNo, long toSeqNo, Version indexVersionCreated) {
|
|
return new BooleanQuery.Builder().add(LongPoint.newRangeQuery(SeqNoFieldMapper.NAME, fromSeqNo, toSeqNo), BooleanClause.Occur.MUST)
|
|
return new BooleanQuery.Builder().add(LongPoint.newRangeQuery(SeqNoFieldMapper.NAME, fromSeqNo, toSeqNo), BooleanClause.Occur.MUST)
|
|
- .add(Queries.newNonNestedFilter(), BooleanClause.Occur.MUST) // exclude non-root nested documents
|
|
|
|
|
|
+ .add(Queries.newNonNestedFilter(indexVersionCreated), BooleanClause.Occur.MUST) // exclude non-root nested documents
|
|
.build();
|
|
.build();
|
|
}
|
|
}
|
|
|
|
|
|
- static int countOperations(Engine.Searcher engineSearcher, long fromSeqNo, long toSeqNo) throws IOException {
|
|
|
|
|
|
+ static int countOperations(Engine.Searcher engineSearcher, long fromSeqNo, long toSeqNo, Version indexVersionCreated)
|
|
|
|
+ throws IOException {
|
|
if (fromSeqNo < 0 || toSeqNo < 0 || fromSeqNo > toSeqNo) {
|
|
if (fromSeqNo < 0 || toSeqNo < 0 || fromSeqNo > toSeqNo) {
|
|
throw new IllegalArgumentException("Invalid range; from_seqno [" + fromSeqNo + "], to_seqno [" + toSeqNo + "]");
|
|
throw new IllegalArgumentException("Invalid range; from_seqno [" + fromSeqNo + "], to_seqno [" + toSeqNo + "]");
|
|
}
|
|
}
|
|
- return newIndexSearcher(engineSearcher).count(rangeQuery(fromSeqNo, toSeqNo));
|
|
|
|
|
|
+ return newIndexSearcher(engineSearcher).count(rangeQuery(fromSeqNo, toSeqNo, indexVersionCreated));
|
|
}
|
|
}
|
|
|
|
|
|
private TopDocs searchOperations(FieldDoc after, boolean accurateTotalHits) throws IOException {
|
|
private TopDocs searchOperations(FieldDoc after, boolean accurateTotalHits) throws IOException {
|
|
- final Query rangeQuery = rangeQuery(Math.max(fromSeqNo, lastSeenSeqNo), toSeqNo);
|
|
|
|
|
|
+ final Query rangeQuery = rangeQuery(Math.max(fromSeqNo, lastSeenSeqNo), toSeqNo, indexVersionCreated);
|
|
assert accurateTotalHits == false || after == null : "accurate total hits is required by the first batch only";
|
|
assert accurateTotalHits == false || after == null : "accurate total hits is required by the first batch only";
|
|
final SortField sortBySeqNo = new SortField(SeqNoFieldMapper.NAME, SortField.Type.LONG);
|
|
final SortField sortBySeqNo = new SortField(SeqNoFieldMapper.NAME, SortField.Type.LONG);
|
|
final TopFieldCollector collector = TopFieldCollector.create(
|
|
final TopFieldCollector collector = TopFieldCollector.create(
|