Browse Source

Faster sequential access for stored fields (#62509)

Faster sequential access for stored fields

Spinoff of #61806
Today retrieving stored fields at search time is optimized for random access.
So we make no effort to keep state in order to not decompress the same data
multiple times because two documents might be in the same compressed block.
This strategy is acceptable when retrieving a top N sorted by score since
there is no guarantee that documents will be on the same block.
However, we have some use cases where the document to retrieve might be
completely sequential:

Scrolls or normal search sorted by document id.
Queries on Runtime fields that extract from _source.
This commit exposes a sequential stored fields reader in the
custom leaf reader that we use at search time.
That allows to leverage the merge instances of stored fields readers that
are optimized for sequential access.
This change focuses on the fetch phase for now and leverages the merge instances
for stored fields only if all documents to retrieve are adjacent.
Applying the same logic in the source lookup of runtime fields should
be trivial but will be done in a follow up.

The speedup on queries sorted by doc id is significant.
I played with the scroll task of the http_logs rally track
on my laptop and had the following result:

|                                                        Metric |   Task |    Baseline |   Contender |     Diff |    Unit |
|--------------------------------------------------------------:|-------:|------------:|------------:|---------:|--------:|
|                                            Total Young Gen GC |        |       0.199 |       0.231 |    0.032 |       s |
|                                              Total Old Gen GC |        |           0 |           0 |        0 |       s |
|                                                    Store size |        |     17.9704 |     17.9704 |        0 |      GB |
|                                                 Translog size |        | 2.04891e-06 | 2.04891e-06 |        0 |      GB |
|                                        Heap used for segments |        |    0.820332 |    0.820332 |        0 |      MB |
|                                      Heap used for doc values |        |    0.113979 |    0.113979 |        0 |      MB |
|                                           Heap used for terms |        |     0.37973 |     0.37973 |        0 |      MB |
|                                           Heap used for norms |        |     0.03302 |     0.03302 |        0 |      MB |
|                                          Heap used for points |        |           0 |           0 |        0 |      MB |
|                                   Heap used for stored fields |        |    0.293602 |    0.293602 |        0 |      MB |
|                                                 Segment count |        |         541 |         541 |        0 |         |
|                                                Min Throughput | scroll |     12.7872 |     12.8747 |  0.08758 | pages/s |
|                                             Median Throughput | scroll |     12.9679 |     13.0556 |  0.08776 | pages/s |
|                                                Max Throughput | scroll |     13.4001 |     13.5705 |  0.17046 | pages/s |
|                                       50th percentile latency | scroll |     524.966 |     251.396 |  -273.57 |      ms |
|                                       90th percentile latency | scroll |     577.593 |     271.066 | -306.527 |      ms |
|                                      100th percentile latency | scroll |      664.73 |     272.734 | -391.997 |      ms |
|                                  50th percentile service time | scroll |     522.387 |     248.776 | -273.612 |      ms |
|                                  90th percentile service time | scroll |     573.118 |      267.79 | -305.328 |      ms |
|                                 100th percentile service time | scroll |     660.642 |     268.963 | -391.678 |      ms |
|                                                    error rate | scroll |           0 |           0 |        0 |       % |
Closes #62024
Jim Ferenczi 5 years ago
parent
commit
6784c4d1b5

+ 2 - 2
server/src/main/java/org/elasticsearch/common/lucene/index/ElasticsearchDirectoryReader.java

@@ -62,8 +62,8 @@ public final class ElasticsearchDirectoryReader extends FilterDirectoryReader {
     }
 
     /**
-     * Wraps the given reader in a {@link org.elasticsearch.common.lucene.index.ElasticsearchDirectoryReader} as
-     * well as all it's sub-readers in {@link org.elasticsearch.common.lucene.index.ElasticsearchLeafReader} to
+     * Wraps the given reader in a {@link ElasticsearchDirectoryReader} as
+     * well as all it's sub-readers in {@link ElasticsearchLeafReader} to
      * expose the given shard Id.
      *
      * @param reader the reader to wrap

+ 7 - 1
server/src/main/java/org/elasticsearch/common/lucene/index/ElasticsearchLeafReader.java

@@ -18,6 +18,7 @@
  */
 package org.elasticsearch.common.lucene.index;
 
+import org.apache.lucene.codecs.StoredFieldsReader;
 import org.apache.lucene.index.FilterLeafReader;
 import org.apache.lucene.index.LeafReader;
 import org.elasticsearch.index.shard.ShardId;
@@ -26,7 +27,7 @@ import org.elasticsearch.index.shard.ShardId;
  * A {@link org.apache.lucene.index.FilterLeafReader} that exposes
  * Elasticsearch internal per shard / index information like the shard ID.
  */
-public final class ElasticsearchLeafReader extends FilterLeafReader {
+public final class ElasticsearchLeafReader extends SequentialStoredFieldsLeafReader {
 
     private final ShardId shardId;
 
@@ -72,4 +73,9 @@ public final class ElasticsearchLeafReader extends FilterLeafReader {
         }
         return null;
     }
+
+    @Override
+    protected StoredFieldsReader doGetSequentialStoredFieldsReader(StoredFieldsReader reader) {
+        return reader;
+    }
 }

+ 68 - 0
server/src/main/java/org/elasticsearch/common/lucene/index/SequentialStoredFieldsLeafReader.java

@@ -0,0 +1,68 @@
+/*
+ * Licensed to Elasticsearch under one or more contributor
+ * license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright
+ * ownership. Elasticsearch licenses this file to you under
+ * the Apache License, Version 2.0 (the "License"); you may
+ * not use this file except in compliance with the License.
+ * You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied.  See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+
+package org.elasticsearch.common.lucene.index;
+
+import org.apache.lucene.codecs.StoredFieldsReader;
+import org.apache.lucene.index.CodecReader;
+import org.apache.lucene.index.FilterLeafReader;
+import org.apache.lucene.index.LeafReader;
+
+import java.io.IOException;
+
+/**
+ * A {@link FilterLeafReader} that exposes a {@link StoredFieldsReader}
+ * optimized for sequential access. This class should be used by custom
+ * {@link FilterLeafReader} that are used at search time in order to
+ * leverage sequential access when retrieving stored fields in queries,
+ * aggregations or during the fetch phase.
+ */
+public abstract class SequentialStoredFieldsLeafReader extends FilterLeafReader {
+    /**
+     * <p>Construct a StoredFieldsFilterLeafReader based on the specified base reader.
+     * <p>Note that base reader is closed if this FilterLeafReader is closed.</p>
+     *
+     * @param in specified base reader.
+     */
+    public SequentialStoredFieldsLeafReader(LeafReader in) {
+        super(in);
+    }
+
+    /**
+     * Implementations should return a {@link StoredFieldsReader} that wraps the provided <code>reader</code>
+     * that is optimized for sequential access (adjacent doc ids).
+     */
+    protected abstract StoredFieldsReader doGetSequentialStoredFieldsReader(StoredFieldsReader reader);
+
+    /**
+     * Returns a {@link StoredFieldsReader} optimized for sequential access (adjacent doc ids).
+     */
+    public StoredFieldsReader getSequentialStoredFieldsReader() throws IOException {
+        if (in instanceof CodecReader) {
+            CodecReader reader = (CodecReader) in;
+            return doGetSequentialStoredFieldsReader(reader.getFieldsReader().getMergeInstance());
+        } else if (in instanceof SequentialStoredFieldsLeafReader) {
+            SequentialStoredFieldsLeafReader reader = (SequentialStoredFieldsLeafReader) in;
+            return doGetSequentialStoredFieldsReader(reader.getSequentialStoredFieldsReader());
+        } else {
+            throw new IOException("requires a CodecReader or a SequentialStoredFieldsLeafReader, got " + in.getClass());
+        }
+    }
+
+}

+ 42 - 8
server/src/main/java/org/elasticsearch/search/fetch/FetchPhase.java

@@ -30,9 +30,11 @@ import org.apache.lucene.search.Scorer;
 import org.apache.lucene.search.TotalHits;
 import org.apache.lucene.search.Weight;
 import org.apache.lucene.util.BitSet;
+import org.elasticsearch.common.CheckedBiConsumer;
 import org.elasticsearch.common.bytes.BytesReference;
 import org.elasticsearch.common.collect.Tuple;
 import org.elasticsearch.common.document.DocumentField;
+import org.elasticsearch.common.lucene.index.SequentialStoredFieldsLeafReader;
 import org.elasticsearch.common.lucene.search.Queries;
 import org.elasticsearch.common.xcontent.XContentHelper;
 import org.elasticsearch.common.xcontent.XContentType;
@@ -100,6 +102,7 @@ public class FetchPhase {
         for (int index = 0; index < context.docIdsToLoadSize(); index++) {
             docs[index] = new DocIdToIndex(context.docIdsToLoad()[context.docIdsToLoadFrom() + index], index);
         }
+        // make sure that we iterate in doc id order
         Arrays.sort(docs);
 
         Map<String, Set<String>> storedToRequestedFields = new HashMap<>();
@@ -114,6 +117,8 @@ public class FetchPhase {
 
         int currentReaderIndex = -1;
         LeafReaderContext currentReaderContext = null;
+        CheckedBiConsumer<Integer, FieldsVisitor, IOException> fieldReader = null;
+        boolean hasSequentialDocs = hasSequentialDocs(docs);
         for (int index = 0; index < context.docIdsToLoadSize(); index++) {
             if (context.isCancelled()) {
                 throw new TaskCancelledException("cancelled");
@@ -124,6 +129,17 @@ public class FetchPhase {
                 if (currentReaderIndex != readerIndex) {
                     currentReaderContext = context.searcher().getIndexReader().leaves().get(readerIndex);
                     currentReaderIndex = readerIndex;
+                    if (currentReaderContext.reader() instanceof SequentialStoredFieldsLeafReader
+                            && hasSequentialDocs && docs.length >= 10) {
+                        // All the docs to fetch are adjacent but Lucene stored fields are optimized
+                        // for random access and don't optimize for sequential access - except for merging.
+                        // So we do a little hack here and pretend we're going to do merges in order to
+                        // get better sequential access.
+                        SequentialStoredFieldsLeafReader lf = (SequentialStoredFieldsLeafReader) currentReaderContext.reader();
+                        fieldReader = lf.getSequentialStoredFieldsReader()::visitDocument;
+                    } else {
+                        fieldReader = currentReaderContext.reader()::document;
+                    }
                     for (FetchSubPhaseProcessor processor : processors) {
                         processor.setNextReader(currentReaderContext);
                     }
@@ -136,6 +152,7 @@ public class FetchPhase {
                     docId,
                     storedToRequestedFields,
                     currentReaderContext,
+                    fieldReader,
                     sharedCache
                 );
                 for (FetchSubPhaseProcessor processor : processors) {
@@ -249,9 +266,14 @@ public class FetchPhase {
         return -1;
     }
 
-    private HitContext prepareHitContext(SearchContext context, SearchLookup lookup, FieldsVisitor fieldsVisitor, int docId,
+    private HitContext prepareHitContext(SearchContext context,
+                                         SearchLookup lookup,
+                                         FieldsVisitor fieldsVisitor,
+                                         int docId,
                                          Map<String, Set<String>> storedToRequestedFields,
-                                         LeafReaderContext subReaderContext, Map<String, Object> sharedCache) throws IOException {
+                                         LeafReaderContext subReaderContext,
+                                         CheckedBiConsumer<Integer, FieldsVisitor, IOException> storedFieldReader,
+                                         Map<String, Object> sharedCache) throws IOException {
         int rootDocId = findRootDocumentIfNested(context, subReaderContext, docId - subReaderContext.docBase);
         if (rootDocId == -1) {
             return prepareNonNestedHitContext(
@@ -261,10 +283,12 @@ public class FetchPhase {
                 docId,
                 storedToRequestedFields,
                 subReaderContext,
+                storedFieldReader,
                 sharedCache
             );
         } else {
-            return prepareNestedHitContext(context, docId, rootDocId, storedToRequestedFields, subReaderContext, sharedCache);
+            return prepareNestedHitContext(context, docId, rootDocId, storedToRequestedFields,
+                subReaderContext, storedFieldReader, sharedCache);
         }
     }
 
@@ -281,6 +305,7 @@ public class FetchPhase {
                                    int docId,
                                    Map<String, Set<String>> storedToRequestedFields,
                                    LeafReaderContext subReaderContext,
+                                   CheckedBiConsumer<Integer, FieldsVisitor, IOException> fieldReader,
                                    Map<String, Object> sharedCache) throws IOException {
         int subDocId = docId - subReaderContext.docBase;
         if (fieldsVisitor == null) {
@@ -288,7 +313,7 @@ public class FetchPhase {
             return new HitContext(hit, subReaderContext, subDocId, lookup.source(), sharedCache);
         } else {
             SearchHit hit;
-            loadStoredFields(context.mapperService(), subReaderContext, fieldsVisitor, subDocId);
+            loadStoredFields(context.mapperService(), fieldReader, fieldsVisitor, subDocId);
             if (fieldsVisitor.fields().isEmpty() == false) {
                 Map<String, DocumentField> docFields = new HashMap<>();
                 Map<String, DocumentField> metaFields = new HashMap<>();
@@ -320,6 +345,7 @@ public class FetchPhase {
                                                int rootDocId,
                                                Map<String, Set<String>> storedToRequestedFields,
                                                LeafReaderContext subReaderContext,
+                                               CheckedBiConsumer<Integer, FieldsVisitor, IOException> storedFieldReader,
                                                Map<String, Object> sharedCache) throws IOException {
         // Also if highlighting is requested on nested documents we need to fetch the _source from the root document,
         // otherwise highlighting will attempt to fetch the _source from the nested doc, which will fail,
@@ -343,7 +369,7 @@ public class FetchPhase {
             }
         } else {
             FieldsVisitor rootFieldsVisitor = new FieldsVisitor(needSource);
-            loadStoredFields(context.mapperService(), subReaderContext, rootFieldsVisitor, rootDocId);
+            loadStoredFields(context.mapperService(), storedFieldReader, rootFieldsVisitor, rootDocId);
             rootId = rootFieldsVisitor.id();
 
             if (needSource) {
@@ -358,7 +384,7 @@ public class FetchPhase {
         Map<String, DocumentField> metaFields = emptyMap();
         if (context.hasStoredFields() && !context.storedFieldsContext().fieldNames().isEmpty()) {
             FieldsVisitor nestedFieldsVisitor = new CustomFieldsVisitor(storedToRequestedFields.keySet(), false);
-            loadStoredFields(context.mapperService(), subReaderContext, nestedFieldsVisitor, nestedDocId);
+            loadStoredFields(context.mapperService(), storedFieldReader, nestedFieldsVisitor, nestedDocId);
             if (nestedFieldsVisitor.fields().isEmpty() == false) {
                 docFields = new HashMap<>();
                 metaFields = new HashMap<>();
@@ -493,10 +519,10 @@ public class FetchPhase {
     }
 
     private void loadStoredFields(MapperService mapperService,
-                                  LeafReaderContext readerContext,
+                                  CheckedBiConsumer<Integer, FieldsVisitor, IOException> fieldReader,
                                   FieldsVisitor fieldVisitor, int docId) throws IOException {
         fieldVisitor.reset();
-        readerContext.reader().document(docId, fieldVisitor);
+        fieldReader.accept(docId, fieldVisitor);
         fieldVisitor.postProcess(mapperService);
     }
 
@@ -523,4 +549,12 @@ public class FetchPhase {
             }
         }
     }
+
+    /**
+     * Returns <code>true</code> if the provided <code>docs</code> are
+     * stored sequentially (Dn = Dn-1 + 1).
+     */
+    static boolean hasSequentialDocs(DocIdToIndex[] docs) {
+        return docs.length > 0 && docs[docs.length-1].docId - docs[0].docId == docs.length - 1;
+    }
 }

+ 8 - 1
server/src/main/java/org/elasticsearch/search/internal/ExitableDirectoryReader.java

@@ -19,6 +19,7 @@
 
 package org.elasticsearch.search.internal;
 
+import org.apache.lucene.codecs.StoredFieldsReader;
 import org.apache.lucene.index.DirectoryReader;
 import org.apache.lucene.index.FilterDirectoryReader;
 import org.apache.lucene.index.FilterLeafReader;
@@ -30,6 +31,7 @@ import org.apache.lucene.index.TermsEnum;
 import org.apache.lucene.search.suggest.document.CompletionTerms;
 import org.apache.lucene.util.BytesRef;
 import org.apache.lucene.util.automaton.CompiledAutomaton;
+import org.elasticsearch.common.lucene.index.SequentialStoredFieldsLeafReader;
 
 import java.io.IOException;
 
@@ -79,7 +81,7 @@ class ExitableDirectoryReader extends FilterDirectoryReader {
     /**
      * Wraps a {@link FilterLeafReader} with a {@link QueryCancellation}.
      */
-    static class ExitableLeafReader extends FilterLeafReader {
+    static class ExitableLeafReader extends SequentialStoredFieldsLeafReader {
 
         private final QueryCancellation queryCancellation;
 
@@ -119,6 +121,11 @@ class ExitableDirectoryReader extends FilterDirectoryReader {
         public CacheHelper getReaderCacheHelper() {
             return in.getReaderCacheHelper();
         }
+
+        @Override
+        protected StoredFieldsReader doGetSequentialStoredFieldsReader(StoredFieldsReader reader) {
+            return reader;
+        }
     }
 
     /**

+ 22 - 0
server/src/test/java/org/elasticsearch/index/engine/InternalEngineTests.java

@@ -91,6 +91,7 @@ import org.elasticsearch.common.collect.Tuple;
 import org.elasticsearch.common.logging.Loggers;
 import org.elasticsearch.common.lucene.Lucene;
 import org.elasticsearch.common.lucene.index.ElasticsearchDirectoryReader;
+import org.elasticsearch.common.lucene.index.SequentialStoredFieldsLeafReader;
 import org.elasticsearch.common.lucene.uid.Versions;
 import org.elasticsearch.common.lucene.uid.VersionsAndSeqNoResolver;
 import org.elasticsearch.common.lucene.uid.VersionsAndSeqNoResolver.DocIdAndSeqNo;
@@ -5940,4 +5941,25 @@ public class InternalEngineTests extends EngineTestCase {
             }
         }
     }
+
+    public void testProducesStoredFieldsReader() throws Exception {
+        // Make sure that the engine produces a SequentialStoredFieldsLeafReader.
+        // This is required for optimizations on SourceLookup to work, which is in-turn useful for runtime fields.
+        ParsedDocument doc = testParsedDocument("1", null, testDocumentWithTextField("test"),
+            new BytesArray("{}".getBytes(Charset.defaultCharset())), null);
+        Engine.Index operation = randomBoolean() ?
+            appendOnlyPrimary(doc, false, 1)
+            : appendOnlyReplica(doc, false, 1, randomIntBetween(0, 5));
+        engine.index(operation);
+        engine.refresh("test");
+        try (Engine.Searcher searcher = engine.acquireSearcher("test")) {
+            IndexReader reader = searcher.getIndexReader();
+            assertThat(reader.leaves().size(), Matchers.greaterThanOrEqualTo(1));
+            for (LeafReaderContext context: reader.leaves()) {
+                assertThat(context.reader(), Matchers.instanceOf(SequentialStoredFieldsLeafReader.class));
+                SequentialStoredFieldsLeafReader lf = (SequentialStoredFieldsLeafReader) context.reader();
+                assertNotNull(lf.getSequentialStoredFieldsReader());
+            }
+        }
+    }
 }

+ 42 - 0
server/src/test/java/org/elasticsearch/search/fetch/FetchPhaseTests.java

@@ -0,0 +1,42 @@
+/*
+ * Licensed to Elasticsearch under one or more contributor
+ * license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright
+ * ownership. Elasticsearch licenses this file to you under
+ * the Apache License, Version 2.0 (the "License"); you may
+ * not use this file except in compliance with the License.
+ * You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied.  See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+
+package org.elasticsearch.search.fetch;
+
+import org.elasticsearch.test.ESTestCase;
+
+public class FetchPhaseTests extends ESTestCase {
+    public void testSequentialDocs() {
+        FetchPhase.DocIdToIndex[] docs = new FetchPhase.DocIdToIndex[10];
+        int start = randomIntBetween(0, Short.MAX_VALUE);
+        for (int i = 0; i < 10; i++) {
+            docs[i] = new FetchPhase.DocIdToIndex(start, i);
+            ++ start;
+        }
+        assertTrue(FetchPhase.hasSequentialDocs(docs));
+
+        int from = randomIntBetween(0, 9);
+        start = docs[from].docId;
+        for (int i = from; i < 10; i++) {
+            start += randomIntBetween(2, 10);
+            docs[i] = new FetchPhase.DocIdToIndex(start, i);
+        }
+        assertFalse(FetchPhase.hasSequentialDocs(docs));
+    }
+}

+ 13 - 2
server/src/test/java/org/elasticsearch/search/internal/ContextIndexSearcherTests.java

@@ -20,13 +20,13 @@
 package org.elasticsearch.search.internal;
 
 import org.apache.lucene.analysis.standard.StandardAnalyzer;
+import org.apache.lucene.codecs.StoredFieldsReader;
 import org.apache.lucene.document.Document;
 import org.apache.lucene.document.Field;
 import org.apache.lucene.document.IntPoint;
 import org.apache.lucene.document.StringField;
 import org.apache.lucene.index.DirectoryReader;
 import org.apache.lucene.index.FilterDirectoryReader;
-import org.apache.lucene.index.FilterLeafReader;
 import org.apache.lucene.index.IndexReader;
 import org.apache.lucene.index.IndexWriter;
 import org.apache.lucene.index.IndexWriterConfig;
@@ -61,6 +61,7 @@ import org.apache.lucene.util.FixedBitSet;
 import org.apache.lucene.util.SparseFixedBitSet;
 import org.elasticsearch.ExceptionsHelper;
 import org.elasticsearch.common.lucene.index.ElasticsearchDirectoryReader;
+import org.elasticsearch.common.lucene.index.SequentialStoredFieldsLeafReader;
 import org.elasticsearch.common.settings.Settings;
 import org.elasticsearch.core.internal.io.IOUtils;
 import org.elasticsearch.index.IndexSettings;
@@ -242,6 +243,11 @@ public class ContextIndexSearcherTests extends ESTestCase {
         ContextIndexSearcher searcher = new ContextIndexSearcher(filteredReader, IndexSearcher.getDefaultSimilarity(),
             IndexSearcher.getDefaultQueryCache(), IndexSearcher.getDefaultQueryCachingPolicy(), true);
 
+        for (LeafReaderContext context : searcher.getIndexReader().leaves()) {
+            assertThat(context.reader(), instanceOf(SequentialStoredFieldsLeafReader.class));
+            SequentialStoredFieldsLeafReader lf = (SequentialStoredFieldsLeafReader) context.reader();
+            assertNotNull(lf.getSequentialStoredFieldsReader());
+        }
         // Assert wrapping
         assertEquals(ExitableDirectoryReader.class, searcher.getIndexReader().getClass());
         for (LeafReaderContext lrc : searcher.getIndexReader().leaves()) {
@@ -312,7 +318,7 @@ public class ContextIndexSearcherTests extends ESTestCase {
         }
     }
 
-    private static class DocumentSubsetReader extends FilterLeafReader {
+    private static class DocumentSubsetReader extends SequentialStoredFieldsLeafReader {
         private final BitSet roleQueryBits;
         private final int numDocs;
 
@@ -357,6 +363,11 @@ public class ContextIndexSearcherTests extends ESTestCase {
             }
         }
 
+        @Override
+        protected StoredFieldsReader doGetSequentialStoredFieldsReader(StoredFieldsReader reader) {
+            return reader;
+        }
+
         private static int computeNumDocs(LeafReader reader, BitSet roleQueryBits) {
             final Bits liveDocs = reader.getLiveDocs();
             if (roleQueryBits == null) {

+ 8 - 2
x-pack/plugin/core/src/main/java/org/elasticsearch/snapshots/SeqIdGeneratingFilterReader.java

@@ -5,14 +5,15 @@
  */
 package org.elasticsearch.snapshots;
 
+import org.apache.lucene.codecs.StoredFieldsReader;
 import org.apache.lucene.index.DirectoryReader;
 import org.apache.lucene.index.FilterDirectoryReader;
-import org.apache.lucene.index.FilterLeafReader;
 import org.apache.lucene.index.LeafReader;
 import org.apache.lucene.index.LeafReaderContext;
 import org.apache.lucene.index.NumericDocValues;
 import org.apache.lucene.index.PointValues;
 import org.apache.lucene.index.Terms;
+import org.elasticsearch.common.lucene.index.SequentialStoredFieldsLeafReader;
 import org.elasticsearch.index.mapper.SeqNoFieldMapper;
 import org.elasticsearch.index.mapper.VersionFieldMapper;
 
@@ -108,7 +109,12 @@ final class SeqIdGeneratingFilterReader extends FilterDirectoryReader {
         public LeafReader wrap(LeafReader reader) {
             LeafReaderContext leafReaderContext = ctxMap.get(reader);
             final int docBase = leafReaderContext.docBase;
-            return new FilterLeafReader(reader) {
+            return new SequentialStoredFieldsLeafReader(reader) {
+
+                @Override
+                protected StoredFieldsReader doGetSequentialStoredFieldsReader(StoredFieldsReader reader) {
+                    return reader;
+                }
 
                 @Override
                 public NumericDocValues getNumericDocValues(String field) throws IOException {

+ 8 - 2
x-pack/plugin/core/src/main/java/org/elasticsearch/xpack/core/security/authz/accesscontrol/DocumentSubsetReader.java

@@ -5,9 +5,9 @@
  */
 package org.elasticsearch.xpack.core.security.authz.accesscontrol;
 
+import org.apache.lucene.codecs.StoredFieldsReader;
 import org.apache.lucene.index.DirectoryReader;
 import org.apache.lucene.index.FilterDirectoryReader;
-import org.apache.lucene.index.FilterLeafReader;
 import org.apache.lucene.index.IndexReader;
 import org.apache.lucene.index.LeafReader;
 import org.apache.lucene.search.DocIdSetIterator;
@@ -22,6 +22,7 @@ import org.elasticsearch.ExceptionsHelper;
 import org.elasticsearch.common.cache.Cache;
 import org.elasticsearch.common.cache.CacheBuilder;
 import org.elasticsearch.common.logging.LoggerMessageFormat;
+import org.elasticsearch.common.lucene.index.SequentialStoredFieldsLeafReader;
 
 import java.io.IOException;
 import java.io.UncheckedIOException;
@@ -32,7 +33,7 @@ import java.util.concurrent.ExecutionException;
 /**
  * A reader that only exposes documents via {@link #getLiveDocs()} that matches with the provided role query.
  */
-public final class DocumentSubsetReader extends FilterLeafReader {
+public final class DocumentSubsetReader extends SequentialStoredFieldsLeafReader {
 
     public static DocumentSubsetDirectoryReader wrap(DirectoryReader in, DocumentSubsetBitsetCache bitsetCache,
             Query roleQuery) throws IOException {
@@ -222,4 +223,9 @@ public final class DocumentSubsetReader extends FilterLeafReader {
         // Not delegated since we change the live docs
         return null;
     }
+
+    @Override
+    protected StoredFieldsReader doGetSequentialStoredFieldsReader(StoredFieldsReader reader) {
+        return reader;
+    }
 }

+ 104 - 45
x-pack/plugin/core/src/main/java/org/elasticsearch/xpack/core/security/authz/accesscontrol/FieldSubsetReader.java

@@ -5,6 +5,7 @@
  */
 package org.elasticsearch.xpack.core.security.authz.accesscontrol;
 
+import org.apache.lucene.codecs.StoredFieldsReader;
 import org.apache.lucene.index.BinaryDocValues;
 import org.apache.lucene.index.DirectoryReader;
 import org.apache.lucene.index.FieldInfo;
@@ -29,6 +30,7 @@ import org.elasticsearch.common.bytes.BytesArray;
 import org.elasticsearch.common.bytes.BytesReference;
 import org.elasticsearch.common.collect.Tuple;
 import org.elasticsearch.common.logging.LoggerMessageFormat;
+import org.elasticsearch.common.lucene.index.SequentialStoredFieldsLeafReader;
 import org.elasticsearch.common.xcontent.XContentBuilder;
 import org.elasticsearch.common.xcontent.XContentHelper;
 import org.elasticsearch.common.xcontent.XContentType;
@@ -48,7 +50,7 @@ import java.util.Map;
  * of fields from the underlying wrapped reader.
  */
 // based on lucene/test-framework's FieldFilterLeafReader.
-public final class FieldSubsetReader extends FilterLeafReader {
+public final class FieldSubsetReader extends SequentialStoredFieldsLeafReader {
 
     /**
      * Wraps a provided DirectoryReader, exposing a subset of fields.
@@ -227,51 +229,12 @@ public final class FieldSubsetReader extends FilterLeafReader {
 
     @Override
     public void document(final int docID, final StoredFieldVisitor visitor) throws IOException {
-        super.document(docID, new StoredFieldVisitor() {
-            @Override
-            public void binaryField(FieldInfo fieldInfo, byte[] value) throws IOException {
-                if (SourceFieldMapper.NAME.equals(fieldInfo.name)) {
-                    // for _source, parse, filter out the fields we care about, and serialize back downstream
-                    BytesReference bytes = new BytesArray(value);
-                    Tuple<XContentType, Map<String, Object>> result = XContentHelper.convertToMap(bytes, true);
-                    Map<String, Object> transformedSource = filter(result.v2(), filter, 0);
-                    XContentBuilder xContentBuilder = XContentBuilder.builder(result.v1().xContent()).map(transformedSource);
-                    visitor.binaryField(fieldInfo, BytesReference.toBytes(BytesReference.bytes(xContentBuilder)));
-                } else {
-                    visitor.binaryField(fieldInfo, value);
-                }
-            }
-
-            @Override
-            public void stringField(FieldInfo fieldInfo, byte[] value) throws IOException {
-                visitor.stringField(fieldInfo, value);
-            }
-
-            @Override
-            public void intField(FieldInfo fieldInfo, int value) throws IOException {
-                visitor.intField(fieldInfo, value);
-            }
-
-            @Override
-            public void longField(FieldInfo fieldInfo, long value) throws IOException {
-                visitor.longField(fieldInfo, value);
-            }
-
-            @Override
-            public void floatField(FieldInfo fieldInfo, float value) throws IOException {
-                visitor.floatField(fieldInfo, value);
-            }
-
-            @Override
-            public void doubleField(FieldInfo fieldInfo, double value) throws IOException {
-                visitor.doubleField(fieldInfo, value);
-            }
+        super.document(docID, new FieldSubsetStoredFieldVisitor(visitor));
+    }
 
-            @Override
-            public Status needsField(FieldInfo fieldInfo) throws IOException {
-                return hasField(fieldInfo.name) ? visitor.needsField(fieldInfo) : Status.NO;
-            }
-        });
+    @Override
+    protected StoredFieldsReader doGetSequentialStoredFieldsReader(StoredFieldsReader reader) {
+        return new FieldSubsetStoredFieldsReader(reader);
     }
 
     @Override
@@ -321,6 +284,102 @@ public final class FieldSubsetReader extends FilterLeafReader {
         return in.getReaderCacheHelper();
     }
 
+    /**
+     * StoredFields impl that filters out stored fields and source fields that should not be visible.
+     */
+    class FieldSubsetStoredFieldsReader extends StoredFieldsReader {
+        final StoredFieldsReader reader;
+
+        FieldSubsetStoredFieldsReader(StoredFieldsReader reader) {
+            this.reader = reader;
+        }
+
+        @Override
+        public void visitDocument(int docID, StoredFieldVisitor visitor) throws IOException {
+            reader.visitDocument(docID, new FieldSubsetStoredFieldVisitor(visitor));
+        }
+
+        @Override
+        public StoredFieldsReader clone() {
+            return new FieldSubsetStoredFieldsReader(reader.clone());
+        }
+
+        @Override
+        public StoredFieldsReader getMergeInstance() {
+            return new FieldSubsetStoredFieldsReader(reader.getMergeInstance());
+        }
+
+        @Override
+        public void checkIntegrity() throws IOException {
+            reader.checkIntegrity();
+        }
+
+        @Override
+        public void close() throws IOException {
+            reader.close();
+        }
+
+        @Override
+        public long ramBytesUsed() {
+            return reader.ramBytesUsed();
+        }
+    }
+
+    /**
+     * A field visitor that filters out stored fields and source fields that should not be visible.
+     */
+    class FieldSubsetStoredFieldVisitor extends StoredFieldVisitor {
+        final StoredFieldVisitor visitor;
+
+        FieldSubsetStoredFieldVisitor(StoredFieldVisitor visitor) {
+            this.visitor = visitor;
+        }
+
+        @Override
+        public void binaryField(FieldInfo fieldInfo, byte[] value) throws IOException {
+            if (SourceFieldMapper.NAME.equals(fieldInfo.name)) {
+                // for _source, parse, filter out the fields we care about, and serialize back downstream
+                BytesReference bytes = new BytesArray(value);
+                Tuple<XContentType, Map<String, Object>> result = XContentHelper.convertToMap(bytes, true);
+                Map<String, Object> transformedSource = filter(result.v2(), filter, 0);
+                XContentBuilder xContentBuilder = XContentBuilder.builder(result.v1().xContent()).map(transformedSource);
+                visitor.binaryField(fieldInfo, BytesReference.toBytes(BytesReference.bytes(xContentBuilder)));
+            } else {
+                visitor.binaryField(fieldInfo, value);
+            }
+        }
+
+        @Override
+        public void stringField(FieldInfo fieldInfo, byte[] value) throws IOException {
+            visitor.stringField(fieldInfo, value);
+        }
+
+        @Override
+        public void intField(FieldInfo fieldInfo, int value) throws IOException {
+            visitor.intField(fieldInfo, value);
+        }
+
+        @Override
+        public void longField(FieldInfo fieldInfo, long value) throws IOException {
+            visitor.longField(fieldInfo, value);
+        }
+
+        @Override
+        public void floatField(FieldInfo fieldInfo, float value) throws IOException {
+            visitor.floatField(fieldInfo, value);
+        }
+
+        @Override
+        public void doubleField(FieldInfo fieldInfo, double value) throws IOException {
+            visitor.doubleField(fieldInfo, value);
+        }
+
+        @Override
+        public StoredFieldVisitor.Status needsField(FieldInfo fieldInfo) throws IOException {
+            return hasField(fieldInfo.name) ? visitor.needsField(fieldInfo) : StoredFieldVisitor.Status.NO;
+        }
+    }
+
     /**
      * Filters the Fields instance from the postings.
      * <p>

+ 38 - 0
x-pack/plugin/core/src/test/java/org/elasticsearch/xpack/core/security/authz/accesscontrol/DocumentSubsetReaderTests.java

@@ -13,6 +13,7 @@ import org.apache.lucene.index.DirectoryReader;
 import org.apache.lucene.index.IndexWriter;
 import org.apache.lucene.index.IndexWriterConfig;
 import org.apache.lucene.index.LeafReader;
+import org.apache.lucene.index.LeafReaderContext;
 import org.apache.lucene.index.NoMergePolicy;
 import org.apache.lucene.index.Term;
 import org.apache.lucene.search.IndexSearcher;
@@ -22,12 +23,14 @@ import org.apache.lucene.search.TermQuery;
 import org.apache.lucene.search.TopDocs;
 import org.apache.lucene.store.Directory;
 import org.apache.lucene.util.Bits;
+import org.elasticsearch.common.lucene.index.SequentialStoredFieldsLeafReader;
 import org.elasticsearch.core.internal.io.IOUtils;
 import org.apache.lucene.util.TestUtil;
 import org.elasticsearch.common.lucene.index.ElasticsearchDirectoryReader;
 import org.elasticsearch.common.settings.Settings;
 import org.elasticsearch.index.shard.ShardId;
 import org.elasticsearch.test.ESTestCase;
+import org.hamcrest.Matchers;
 import org.junit.After;
 import org.junit.Before;
 
@@ -215,6 +218,41 @@ public class DocumentSubsetReaderTests extends ESTestCase {
         IOUtils.close(ir, ir2, iw, dir);
     }
 
+    public void testProducesStoredFieldsReader() throws Exception {
+        Directory dir = newDirectory();
+        IndexWriterConfig iwc = new IndexWriterConfig(null);
+        iwc.setMaxBufferedDocs(100);
+        iwc.setMergePolicy(NoMergePolicy.INSTANCE);
+        IndexWriter iw = new IndexWriter(dir, iwc);
+
+        // add two docs, id:0 and id:1
+        Document doc = new Document();
+        Field idField = new StringField("id", "", Field.Store.NO);
+        doc.add(idField);
+        idField.setStringValue("0");
+        iw.addDocument(doc);
+        iw.commit();
+
+        idField.setStringValue("1");
+        iw.addDocument(doc);
+        iw.commit();
+
+        // open reader
+        DirectoryReader reader = ElasticsearchDirectoryReader.wrap(DirectoryReader.open(iw), new ShardId("_index", "_na_", 0));
+        reader = DocumentSubsetReader.wrap(reader, bitsetCache, new MatchAllDocsQuery());
+        assertEquals(2, reader.numDocs());
+        assertEquals(2, reader.leaves().size());
+
+        TestUtil.checkReader(reader);
+        assertThat(reader.leaves().size(), Matchers.greaterThanOrEqualTo(1));
+        for (LeafReaderContext context: reader.leaves()) {
+            assertThat(context.reader(), Matchers.instanceOf(SequentialStoredFieldsLeafReader.class));
+            SequentialStoredFieldsLeafReader lf = (SequentialStoredFieldsLeafReader) context.reader();
+            assertNotNull(lf.getSequentialStoredFieldsReader());
+        }
+        IOUtils.close(reader, iw, dir);
+    }
+
     private void openDirectoryReader() throws IOException {
         directoryReader = DirectoryReader.open(directory);
         directoryReader = ElasticsearchDirectoryReader.wrap(directoryReader, new ShardId("_index", "_na_", 0));

+ 36 - 0
x-pack/plugin/core/src/test/java/org/elasticsearch/xpack/core/security/authz/accesscontrol/FieldSubsetReaderTests.java

@@ -27,6 +27,7 @@ import org.apache.lucene.index.FilterDirectoryReader;
 import org.apache.lucene.index.IndexWriter;
 import org.apache.lucene.index.IndexWriterConfig;
 import org.apache.lucene.index.LeafReader;
+import org.apache.lucene.index.LeafReaderContext;
 import org.apache.lucene.index.NoMergePolicy;
 import org.apache.lucene.index.NumericDocValues;
 import org.apache.lucene.index.PointValues;
@@ -53,6 +54,7 @@ import org.elasticsearch.cluster.metadata.MappingMetadata;
 import org.elasticsearch.cluster.metadata.Metadata;
 import org.elasticsearch.common.Strings;
 import org.elasticsearch.common.collect.ImmutableOpenMap;
+import org.elasticsearch.common.lucene.index.SequentialStoredFieldsLeafReader;
 import org.elasticsearch.common.settings.Settings;
 import org.elasticsearch.common.xcontent.XContentHelper;
 import org.elasticsearch.common.xcontent.XContentType;
@@ -63,6 +65,7 @@ import org.elasticsearch.test.ESTestCase;
 import org.elasticsearch.xpack.core.security.authz.permission.FieldPermissions;
 import org.elasticsearch.xpack.core.security.authz.permission.FieldPermissionsDefinition;
 import org.elasticsearch.xpack.core.security.support.Automatons;
+import org.hamcrest.Matchers;
 
 import java.io.IOException;
 import java.nio.charset.StandardCharsets;
@@ -1151,6 +1154,39 @@ public class FieldSubsetReaderTests extends ESTestCase {
         }
     }
 
+    public void testProducesStoredFieldsReader() throws Exception {
+        Directory dir = newDirectory();
+        IndexWriterConfig iwc = new IndexWriterConfig(null);
+        IndexWriter iw = new IndexWriter(dir, iwc);
+
+        // add document with 2 fields
+        Document doc = new Document();
+        doc.add(new StringField("fieldA", "test", Field.Store.NO));
+        doc.add(new StringField("fieldB", "test", Field.Store.NO));
+        iw.addDocument(doc);
+        iw.commit();
+
+        // add document with 2 fields
+        doc = new Document();
+        doc.add(new StringField("fieldA", "test2", Field.Store.NO));
+        doc.add(new StringField("fieldB", "test2", Field.Store.NO));
+        iw.addDocument(doc);
+        iw.commit();
+
+        // open reader
+        Automaton automaton = Automatons.patterns(Arrays.asList("fieldA", SourceFieldMapper.NAME));
+        DirectoryReader ir = FieldSubsetReader.wrap(DirectoryReader.open(iw), new CharacterRunAutomaton(automaton));
+
+        TestUtil.checkReader(ir);
+        assertThat(ir.leaves().size(), Matchers.greaterThanOrEqualTo(1));
+        for (LeafReaderContext context: ir.leaves()) {
+            assertThat(context.reader(), Matchers.instanceOf(SequentialStoredFieldsLeafReader.class));
+            SequentialStoredFieldsLeafReader lf = (SequentialStoredFieldsLeafReader) context.reader();
+            assertNotNull(lf.getSequentialStoredFieldsReader());
+        }
+        IOUtils.close(ir, iw, dir);
+    }
+
     private static final String DOC_TEST_ITEM = "{\n" +
             "  \"field_text\" : \"text\",\n" +
             "  \"object\" : {\n" +