Bläddra i källkod

Speed up PK lookups at index time. (#19856)

At index time Elasticsearch needs to look up the version associated with the
`_id` of the document that is being indexed, which is often the bottleneck for
indexing.

While reviewing the output of the `jfr` telemetry from a Rally benchmark, I saw
that significant time was spent in `ConcurrentHashMap#get` and `ThreadLocal#get`.
The reason is that we cache lookup objects per thread and segment, and for every
indexed document, we first need to look up the cache associated with this
segment (`ConcurrentHashMap#get`) and then get a state that is local to the
current thread (`ThreadLocal#get`). So if you are indexing N documents per
second and have S segments, both these methods will be called N*S times per
second.

This commit changes version lookup to use a cache per index reader rather than
per segment. While this makes cache entries live for less long, we now only need
to do one call to `ConcurrentHashMap#get` and `ThreadLocal#get` per indexed
document.
Adrien Grand 8 år sedan
förälder
incheckning
5a6fa62844

+ 20 - 24
core/src/main/java/org/elasticsearch/common/lucene/uid/PerThreadIDVersionAndSeqNoLookup.java

@@ -43,12 +43,21 @@ import java.io.IOException;
  *  not thread safe, so it is the caller's job to create and use one
  *  instance of this per thread.  Do not use this if a term may appear
  *  in more than one document!  It will only return the first one it
- *  finds. */
-
+ *  finds.
+ *  This class uses live docs, so it should be cached based on the
+ *  {@link org.apache.lucene.index.IndexReader#getReaderCacheHelper() reader cache helper}
+ *  rather than the {@link LeafReader#getCoreCacheHelper() core cache helper}.
+ */
 final class PerThreadIDVersionAndSeqNoLookup {
     // TODO: do we really need to store all this stuff? some if it might not speed up anything.
     // we keep it around for now, to reduce the amount of e.g. hash lookups by field and stuff
 
+    /** The {@link LeafReaderContext} that needs to be looked up. */
+    private final LeafReaderContext context;
+    /** Live docs of the context, cached to avoid the cost of ensureOpen() on every
+     *  segment for every index operation. */
+    private final Bits liveDocs;
+
     /** terms enum for uid field */
     final String uidField;
     private final TermsEnum termsEnum;
@@ -62,7 +71,10 @@ final class PerThreadIDVersionAndSeqNoLookup {
     /**
      * Initialize lookup for the provided segment
      */
-    PerThreadIDVersionAndSeqNoLookup(LeafReader reader, String uidField) throws IOException {
+    PerThreadIDVersionAndSeqNoLookup(LeafReaderContext context, String uidField) throws IOException {
+        this.context = context;
+        final LeafReader reader = context.reader();
+        this.liveDocs = reader.getLiveDocs();
         this.uidField = uidField;
         Fields fields = reader.fields();
         Terms terms = fields.terms(uidField);
@@ -80,11 +92,11 @@ final class PerThreadIDVersionAndSeqNoLookup {
     }
 
     /** Return null if id is not found. */
-    public DocIdAndVersion lookupVersion(BytesRef id, Bits liveDocs, LeafReaderContext context)
+    public DocIdAndVersion lookupVersion(BytesRef id)
         throws IOException {
         assert context.reader().getCoreCacheHelper().getKey().equals(readerKey) :
             "context's reader is not the same as the reader class was initialized on.";
-        int docID = getDocID(id, liveDocs);
+        int docID = getDocID(id);
 
         if (docID != DocIdSetIterator.NO_MORE_DOCS) {
             final NumericDocValues versions = context.reader().getNumericDocValues(VersionFieldMapper.NAME);
@@ -104,7 +116,7 @@ final class PerThreadIDVersionAndSeqNoLookup {
      * returns the internal lucene doc id for the given id bytes.
      * {@link DocIdSetIterator#NO_MORE_DOCS} is returned if not found
      * */
-    private int getDocID(BytesRef id, Bits liveDocs) throws IOException {
+    private int getDocID(BytesRef id) throws IOException {
         if (termsEnum.seekExact(id)) {
             int docID = DocIdSetIterator.NO_MORE_DOCS;
             // there may be more than one matching docID, in the case of nested docs, so we want the last one:
@@ -122,10 +134,8 @@ final class PerThreadIDVersionAndSeqNoLookup {
     }
 
     /** Return null if id is not found. */
-    DocIdAndSeqNo lookupSeqNo(BytesRef id, Bits liveDocs, LeafReaderContext context) throws IOException {
-        assert context.reader().getCoreCacheHelper().getKey().equals(readerKey) :
-            "context's reader is not the same as the reader class was initialized on.";
-        int docID = getDocID(id, liveDocs);
+    DocIdAndSeqNo lookupSeqNo(BytesRef id) throws IOException {
+        int docID = getDocID(id);
         if (docID != DocIdSetIterator.NO_MORE_DOCS) {
             NumericDocValues seqNos = context.reader().getNumericDocValues(SeqNoFieldMapper.NAME);
             long seqNo;
@@ -139,18 +149,4 @@ final class PerThreadIDVersionAndSeqNoLookup {
             return null;
         }
     }
-
-    /**
-     * returns 0 if the primary term is not found.
-     *
-     * Note that 0 is an illegal primary term. See {@link org.elasticsearch.cluster.metadata.IndexMetaData#primaryTerm(int)}
-     **/
-    long lookUpPrimaryTerm(int docID, LeafReader reader) throws IOException {
-        NumericDocValues primaryTerms = reader.getNumericDocValues(SeqNoFieldMapper.PRIMARY_TERM_NAME);
-        if (primaryTerms != null && primaryTerms.advanceExact(docID)) {
-            return primaryTerms.longValue();
-        } else {
-            return 0;
-        }
-    }
 }

+ 40 - 29
core/src/main/java/org/elasticsearch/common/lucene/uid/VersionsAndSeqNoResolver.java

@@ -20,11 +20,12 @@
 package org.elasticsearch.common.lucene.uid;
 
 import org.apache.lucene.index.IndexReader;
-import org.apache.lucene.index.LeafReader;
 import org.apache.lucene.index.LeafReaderContext;
+import org.apache.lucene.index.NumericDocValues;
 import org.apache.lucene.index.Term;
 import org.apache.lucene.util.CloseableThreadLocal;
 import org.elasticsearch.common.util.concurrent.ConcurrentCollections;
+import org.elasticsearch.index.mapper.SeqNoFieldMapper;
 
 import java.io.IOException;
 import java.util.List;
@@ -36,26 +37,31 @@ import static org.elasticsearch.common.lucene.uid.Versions.NOT_FOUND;
 /** Utility class to resolve the Lucene doc ID, version, seqNo and primaryTerms for a given uid. */
 public final class VersionsAndSeqNoResolver {
 
-    static final ConcurrentMap<Object, CloseableThreadLocal<PerThreadIDVersionAndSeqNoLookup>> lookupStates =
+    static final ConcurrentMap<IndexReader.CacheKey, CloseableThreadLocal<PerThreadIDVersionAndSeqNoLookup[]>> lookupStates =
         ConcurrentCollections.newConcurrentMapWithAggressiveConcurrency();
 
     // Evict this reader from lookupStates once it's closed:
     private static final IndexReader.ClosedListener removeLookupState = key -> {
-        CloseableThreadLocal<PerThreadIDVersionAndSeqNoLookup> ctl = lookupStates.remove(key);
+        CloseableThreadLocal<PerThreadIDVersionAndSeqNoLookup[]> ctl = lookupStates.remove(key);
         if (ctl != null) {
             ctl.close();
         }
     };
 
-    private static PerThreadIDVersionAndSeqNoLookup getLookupState(LeafReader reader, String uidField) throws IOException {
-        IndexReader.CacheHelper cacheHelper = reader.getCoreCacheHelper();
-        CloseableThreadLocal<PerThreadIDVersionAndSeqNoLookup> ctl = lookupStates.get(cacheHelper.getKey());
+    private static PerThreadIDVersionAndSeqNoLookup[] getLookupState(IndexReader reader, String uidField) throws IOException {
+        // We cache on the top level
+        // This means cache entries have a shorter lifetime, maybe as low as 1s with the
+        // default refresh interval and a steady indexing rate, but on the other hand it
+        // proved to be cheaper than having to perform a CHM and a TL get for every segment.
+        // See https://github.com/elastic/elasticsearch/pull/19856.
+        IndexReader.CacheHelper cacheHelper = reader.getReaderCacheHelper();
+        CloseableThreadLocal<PerThreadIDVersionAndSeqNoLookup[]> ctl = lookupStates.get(cacheHelper.getKey());
         if (ctl == null) {
             // First time we are seeing this reader's core; make a new CTL:
             ctl = new CloseableThreadLocal<>();
-            CloseableThreadLocal<PerThreadIDVersionAndSeqNoLookup> other = lookupStates.putIfAbsent(cacheHelper.getKey(), ctl);
+            CloseableThreadLocal<PerThreadIDVersionAndSeqNoLookup[]> other = lookupStates.putIfAbsent(cacheHelper.getKey(), ctl);
             if (other == null) {
-                // Our CTL won, we must remove it when the core is closed:
+                // Our CTL won, we must remove it when the reader is closed:
                 cacheHelper.addClosedListener(removeLookupState);
             } else {
                 // Another thread beat us to it: just use their CTL:
@@ -63,13 +69,22 @@ public final class VersionsAndSeqNoResolver {
             }
         }
 
-        PerThreadIDVersionAndSeqNoLookup lookupState = ctl.get();
+        PerThreadIDVersionAndSeqNoLookup[] lookupState = ctl.get();
         if (lookupState == null) {
-            lookupState = new PerThreadIDVersionAndSeqNoLookup(reader, uidField);
+            lookupState = new PerThreadIDVersionAndSeqNoLookup[reader.leaves().size()];
+            for (LeafReaderContext leaf : reader.leaves()) {
+                lookupState[leaf.ord] = new PerThreadIDVersionAndSeqNoLookup(leaf, uidField);
+            }
             ctl.set(lookupState);
-        } else if (Objects.equals(lookupState.uidField, uidField) == false) {
+        }
+
+        if (lookupState.length != reader.leaves().size()) {
+            throw new AssertionError("Mismatched numbers of leaves: " + lookupState.length + " != " + reader.leaves().size());
+        }
+
+        if (lookupState.length > 0 && Objects.equals(lookupState[0].uidField, uidField) == false) {
             throw new AssertionError("Index does not consistently use the same uid field: ["
-                    + uidField + "] != [" + lookupState.uidField + "]");
+                    + uidField + "] != [" + lookupState[0].uidField + "]");
         }
 
         return lookupState;
@@ -112,17 +127,13 @@ public final class VersionsAndSeqNoResolver {
      * </ul>
      */
     public static DocIdAndVersion loadDocIdAndVersion(IndexReader reader, Term term) throws IOException {
+        PerThreadIDVersionAndSeqNoLookup[] lookups = getLookupState(reader, term.field());
         List<LeafReaderContext> leaves = reader.leaves();
-        if (leaves.isEmpty()) {
-            return null;
-        }
         // iterate backwards to optimize for the frequently updated documents
         // which are likely to be in the last segments
         for (int i = leaves.size() - 1; i >= 0; i--) {
-            LeafReaderContext context = leaves.get(i);
-            LeafReader leaf = context.reader();
-            PerThreadIDVersionAndSeqNoLookup lookup = getLookupState(leaf, term.field());
-            DocIdAndVersion result = lookup.lookupVersion(term.bytes(), leaf.getLiveDocs(), context);
+            PerThreadIDVersionAndSeqNoLookup lookup = lookups[leaves.get(i).ord];
+            DocIdAndVersion result = lookup.lookupVersion(term.bytes());
             if (result != null) {
                 return result;
             }
@@ -137,17 +148,13 @@ public final class VersionsAndSeqNoResolver {
      * </ul>
      */
     public static DocIdAndSeqNo loadDocIdAndSeqNo(IndexReader reader, Term term) throws IOException {
+        PerThreadIDVersionAndSeqNoLookup[] lookups = getLookupState(reader, term.field());
         List<LeafReaderContext> leaves = reader.leaves();
-        if (leaves.isEmpty()) {
-            return null;
-        }
         // iterate backwards to optimize for the frequently updated documents
         // which are likely to be in the last segments
         for (int i = leaves.size() - 1; i >= 0; i--) {
-            LeafReaderContext context = leaves.get(i);
-            LeafReader leaf = context.reader();
-            PerThreadIDVersionAndSeqNoLookup lookup = getLookupState(leaf, term.field());
-            DocIdAndSeqNo result = lookup.lookupSeqNo(term.bytes(), leaf.getLiveDocs(), context);
+            PerThreadIDVersionAndSeqNoLookup lookup = lookups[leaves.get(i).ord];
+            DocIdAndSeqNo result = lookup.lookupSeqNo(term.bytes());
             if (result != null) {
                 return result;
             }
@@ -159,9 +166,13 @@ public final class VersionsAndSeqNoResolver {
      * Load the primaryTerm associated with the given {@link DocIdAndSeqNo}
      */
     public static long loadPrimaryTerm(DocIdAndSeqNo docIdAndSeqNo, String uidField) throws IOException {
-        LeafReader leaf = docIdAndSeqNo.context.reader();
-        PerThreadIDVersionAndSeqNoLookup lookup = getLookupState(leaf, uidField);
-        long result = lookup.lookUpPrimaryTerm(docIdAndSeqNo.docId, leaf);
+        NumericDocValues primaryTerms = docIdAndSeqNo.context.reader().getNumericDocValues(SeqNoFieldMapper.PRIMARY_TERM_NAME);
+        long result;
+        if (primaryTerms != null && primaryTerms.advanceExact(docIdAndSeqNo.docId)) {
+            result = primaryTerms.longValue();
+        } else {
+            result = 0;
+        }
         assert result > 0 : "should always resolve a primary term for a resolved sequence number. primary_term [" + result + "]"
             + " docId [" + docIdAndSeqNo.docId + "] seqNo [" + docIdAndSeqNo.seqNo + "]";
         return result;

+ 32 - 21
core/src/test/java/org/elasticsearch/common/lucene/uid/VersionLookupTests.java

@@ -26,10 +26,10 @@ import org.apache.lucene.index.DirectoryReader;
 import org.apache.lucene.index.IndexWriter;
 import org.apache.lucene.index.IndexWriterConfig;
 import org.apache.lucene.index.LeafReaderContext;
+import org.apache.lucene.index.NoMergePolicy;
+import org.apache.lucene.index.Term;
 import org.apache.lucene.store.Directory;
-import org.apache.lucene.util.Bits;
 import org.apache.lucene.util.BytesRef;
-import org.apache.lucene.util.FixedBitSet;
 import org.elasticsearch.common.lucene.Lucene;
 import org.elasticsearch.common.lucene.uid.VersionsAndSeqNoResolver.DocIdAndVersion;
 import org.elasticsearch.index.mapper.IdFieldMapper;
@@ -46,23 +46,31 @@ public class VersionLookupTests extends ESTestCase {
      */
     public void testSimple() throws Exception {
         Directory dir = newDirectory();
-        IndexWriter writer = new IndexWriter(dir, new IndexWriterConfig(Lucene.STANDARD_ANALYZER));
+        IndexWriter writer = new IndexWriter(dir, new IndexWriterConfig(Lucene.STANDARD_ANALYZER)
+                // to have deleted docs
+                .setMergePolicy(NoMergePolicy.INSTANCE));
         Document doc = new Document();
         doc.add(new Field(IdFieldMapper.NAME, "6", IdFieldMapper.Defaults.FIELD_TYPE));
         doc.add(new NumericDocValuesField(VersionFieldMapper.NAME, 87));
         writer.addDocument(doc);
+        writer.addDocument(new Document());
         DirectoryReader reader = DirectoryReader.open(writer);
         LeafReaderContext segment = reader.leaves().get(0);
-        PerThreadIDVersionAndSeqNoLookup lookup = new PerThreadIDVersionAndSeqNoLookup(segment.reader(), IdFieldMapper.NAME);
+        PerThreadIDVersionAndSeqNoLookup lookup = new PerThreadIDVersionAndSeqNoLookup(segment, IdFieldMapper.NAME);
         // found doc
-        DocIdAndVersion result = lookup.lookupVersion(new BytesRef("6"), null, segment);
+        DocIdAndVersion result = lookup.lookupVersion(new BytesRef("6"));
         assertNotNull(result);
         assertEquals(87, result.version);
         assertEquals(0, result.docId);
         // not found doc
-        assertNull(lookup.lookupVersion(new BytesRef("7"), null, segment));
+        assertNull(lookup.lookupVersion(new BytesRef("7")));
         // deleted doc
-        assertNull(lookup.lookupVersion(new BytesRef("6"), new Bits.MatchNoBits(1), segment));
+        writer.deleteDocuments(new Term(IdFieldMapper.NAME, "6"));
+        reader.close();
+        reader = DirectoryReader.open(writer);
+        segment = reader.leaves().get(0);
+        lookup = new PerThreadIDVersionAndSeqNoLookup(segment, IdFieldMapper.NAME);
+        assertNull(lookup.lookupVersion(new BytesRef("6")));
         reader.close();
         writer.close();
         dir.close();
@@ -73,36 +81,39 @@ public class VersionLookupTests extends ESTestCase {
      */
     public void testTwoDocuments() throws Exception {
         Directory dir = newDirectory();
-        IndexWriter writer = new IndexWriter(dir, new IndexWriterConfig(Lucene.STANDARD_ANALYZER));
+        IndexWriter writer = new IndexWriter(dir, new IndexWriterConfig(Lucene.STANDARD_ANALYZER)
+                .setMergePolicy(NoMergePolicy.INSTANCE));
         Document doc = new Document();
         doc.add(new Field(IdFieldMapper.NAME, "6", IdFieldMapper.Defaults.FIELD_TYPE));
         doc.add(new NumericDocValuesField(VersionFieldMapper.NAME, 87));
         writer.addDocument(doc);
         writer.addDocument(doc);
+        writer.addDocument(new Document());
         DirectoryReader reader = DirectoryReader.open(writer);
         LeafReaderContext segment = reader.leaves().get(0);
-        PerThreadIDVersionAndSeqNoLookup lookup = new PerThreadIDVersionAndSeqNoLookup(segment.reader(), IdFieldMapper.NAME);
+        PerThreadIDVersionAndSeqNoLookup lookup = new PerThreadIDVersionAndSeqNoLookup(segment, IdFieldMapper.NAME);
         // return the last doc when there are duplicates
-        DocIdAndVersion result = lookup.lookupVersion(new BytesRef("6"), null, segment);
+        DocIdAndVersion result = lookup.lookupVersion(new BytesRef("6"));
         assertNotNull(result);
         assertEquals(87, result.version);
         assertEquals(1, result.docId);
         // delete the first doc only
-        FixedBitSet live = new FixedBitSet(2);
-        live.set(1);
-        result = lookup.lookupVersion(new BytesRef("6"), live, segment);
+        assertTrue(writer.tryDeleteDocument(reader, 0) >= 0);
+        reader.close();
+        reader = DirectoryReader.open(writer);
+        segment = reader.leaves().get(0);
+        lookup = new PerThreadIDVersionAndSeqNoLookup(segment, IdFieldMapper.NAME);
+        result = lookup.lookupVersion(new BytesRef("6"));
         assertNotNull(result);
         assertEquals(87, result.version);
         assertEquals(1, result.docId);
-        // delete the second doc only
-        live.clear(1);
-        live.set(0);
-        result = lookup.lookupVersion(new BytesRef("6"), live, segment);
-        assertNotNull(result);
-        assertEquals(87, result.version);
-        assertEquals(0, result.docId);
         // delete both docs
-        assertNull(lookup.lookupVersion(new BytesRef("6"), new Bits.MatchNoBits(2), segment));
+        assertTrue(writer.tryDeleteDocument(reader, 1) >= 0);
+        reader.close();
+        reader = DirectoryReader.open(writer);
+        segment = reader.leaves().get(0);
+        lookup = new PerThreadIDVersionAndSeqNoLookup(segment, IdFieldMapper.NAME);
+        assertNull(lookup.lookupVersion(new BytesRef("6")));
         reader.close();
         writer.close();
         dir.close();