浏览代码

Allow realtime get to read from translog (#48843)

The realtime GET API currently has erratic performance in case where a document is accessed
that has just been indexed but not refreshed yet, as the implementation will currently force an
internal refresh in that case. Refreshing can be an expensive operation, and also will block the
thread that executes the GET operation, blocking other GETs to be processed. In case of
frequent access of recently indexed documents, this can lead to a refresh storm and terrible
GET performance.

While older versions of Elasticsearch (2.x and older) did not trigger refreshes and instead opted
to read from the translog in case of realtime GET API or update API, this was removed in 5.0
(#20102) to avoid inconsistencies between values that were returned from the translog and
those returned by the index. This was partially reverted in 6.3 (#29264) to allow _update and
upsert to read from the translog again as it was easier to guarantee consistency for these, and
also brought back more predictable performance characteristics of this API. Calls to the realtime
GET API, however, would still always do a refresh if necessary to return consistent results. This
means that users that were calling realtime GET APIs to coordinate updates on client side
(realtime GET + CAS for conditional index of updated doc) would still see very erratic
performance.

This PR (together with #48707) resolves the inconsistencies between reading from translog and
index. In particular it fixes the inconsistencies that happen when requesting stored fields, which
were not available when reading from translog. In case where stored fields are requested, this
PR will reparse the _source from the translog and derive the stored fields to be returned. With
this, it changes the realtime GET API to allow reading from the translog again, avoid refresh
storms and blocking the GET threadpool, and provide overall much better and predictable
performance for this API.
Yannick Welsch 6 年之前
父节点
当前提交
01030caf8e

+ 5 - 5
docs/reference/docs/get.asciidoc

@@ -35,11 +35,11 @@ that it exists.
 ===== Realtime
 ===== Realtime
 
 
 By default, the get API is realtime, and is not affected by the refresh
 By default, the get API is realtime, and is not affected by the refresh
-rate of the index (when data will become visible for search). If a document
-has been updated but is not yet refreshed, the get API will issue a refresh
-call in-place to make the document visible. This will also make other documents
-changed since the last refresh visible. In order to disable realtime GET,
-one can set the `realtime` parameter to `false`.
+rate of the index (when data will become visible for search). In case where
+stored fields are requested (see `stored_fields` parameter) and the document
+has been updated but is not yet refreshed, the get API will have to parse
+and analyze the source to extract the stored fields. In order to disable
+realtime GET, the `realtime` parameter can be set to `false`.
 
 
 [float]
 [float]
 [[get-source-filtering]]
 [[get-source-filtering]]

+ 5 - 2
server/src/main/java/org/elasticsearch/index/engine/TranslogLeafReader.java

@@ -35,6 +35,7 @@ import org.apache.lucene.index.StoredFieldVisitor;
 import org.apache.lucene.index.Terms;
 import org.apache.lucene.index.Terms;
 import org.apache.lucene.util.Bits;
 import org.apache.lucene.util.Bits;
 import org.apache.lucene.util.BytesRef;
 import org.apache.lucene.util.BytesRef;
+import org.elasticsearch.common.util.set.Sets;
 import org.elasticsearch.index.mapper.IdFieldMapper;
 import org.elasticsearch.index.mapper.IdFieldMapper;
 import org.elasticsearch.index.mapper.RoutingFieldMapper;
 import org.elasticsearch.index.mapper.RoutingFieldMapper;
 import org.elasticsearch.index.mapper.SourceFieldMapper;
 import org.elasticsearch.index.mapper.SourceFieldMapper;
@@ -44,11 +45,12 @@ import org.elasticsearch.index.translog.Translog;
 import java.io.IOException;
 import java.io.IOException;
 import java.nio.charset.StandardCharsets;
 import java.nio.charset.StandardCharsets;
 import java.util.Collections;
 import java.util.Collections;
+import java.util.Set;
 
 
 /**
 /**
  * Internal class that mocks a single doc read from the transaction log as a leaf reader.
  * Internal class that mocks a single doc read from the transaction log as a leaf reader.
  */
  */
-final class TranslogLeafReader extends LeafReader {
+public final class TranslogLeafReader extends LeafReader {
 
 
     private final Translog.Index operation;
     private final Translog.Index operation;
     private static final FieldInfo FAKE_SOURCE_FIELD
     private static final FieldInfo FAKE_SOURCE_FIELD
@@ -60,6 +62,7 @@ final class TranslogLeafReader extends LeafReader {
     private static final FieldInfo FAKE_ID_FIELD
     private static final FieldInfo FAKE_ID_FIELD
         = new FieldInfo(IdFieldMapper.NAME, 3, false, false, false, IndexOptions.NONE, DocValuesType.NONE, -1, Collections.emptyMap(),
         = new FieldInfo(IdFieldMapper.NAME, 3, false, false, false, IndexOptions.NONE, DocValuesType.NONE, -1, Collections.emptyMap(),
         0, 0, 0, false);
         0, 0, 0, false);
+    public static Set<String> ALL_FIELD_NAMES = Sets.newHashSet(FAKE_SOURCE_FIELD.name, FAKE_ROUTING_FIELD.name, FAKE_ID_FIELD.name);
 
 
     TranslogLeafReader(Translog.Index operation) {
     TranslogLeafReader(Translog.Index operation) {
         this.operation = operation;
         this.operation = operation;
@@ -161,7 +164,7 @@ final class TranslogLeafReader extends LeafReader {
             BytesRef bytesRef = Uid.encodeId(operation.id());
             BytesRef bytesRef = Uid.encodeId(operation.id());
             final byte[] id = new byte[bytesRef.length];
             final byte[] id = new byte[bytesRef.length];
             System.arraycopy(bytesRef.bytes, bytesRef.offset, id, 0, bytesRef.length);
             System.arraycopy(bytesRef.bytes, bytesRef.offset, id, 0, bytesRef.length);
-            visitor.stringField(FAKE_ID_FIELD, id);
+            visitor.binaryField(FAKE_ID_FIELD, id);
         }
         }
     }
     }
 
 

+ 1 - 2
server/src/main/java/org/elasticsearch/index/fieldvisitor/CustomFieldsVisitor.java

@@ -20,7 +20,6 @@ package org.elasticsearch.index.fieldvisitor;
 
 
 import org.apache.lucene.index.FieldInfo;
 import org.apache.lucene.index.FieldInfo;
 
 
-import java.io.IOException;
 import java.util.Set;
 import java.util.Set;
 
 
 /**
 /**
@@ -39,7 +38,7 @@ public class CustomFieldsVisitor extends FieldsVisitor {
     }
     }
 
 
     @Override
     @Override
-    public Status needsField(FieldInfo fieldInfo) throws IOException {
+    public Status needsField(FieldInfo fieldInfo) {
         if (super.needsField(fieldInfo) == Status.YES) {
         if (super.needsField(fieldInfo) == Status.YES) {
             return Status.YES;
             return Status.YES;
         }
         }

+ 21 - 10
server/src/main/java/org/elasticsearch/index/fieldvisitor/FieldsVisitor.java

@@ -32,7 +32,6 @@ import org.elasticsearch.index.mapper.RoutingFieldMapper;
 import org.elasticsearch.index.mapper.SourceFieldMapper;
 import org.elasticsearch.index.mapper.SourceFieldMapper;
 import org.elasticsearch.index.mapper.Uid;
 import org.elasticsearch.index.mapper.Uid;
 
 
-import java.io.IOException;
 import java.nio.charset.StandardCharsets;
 import java.nio.charset.StandardCharsets;
 import java.util.ArrayList;
 import java.util.ArrayList;
 import java.util.HashMap;
 import java.util.HashMap;
@@ -72,7 +71,7 @@ public class FieldsVisitor extends StoredFieldVisitor {
     }
     }
 
 
     @Override
     @Override
-    public Status needsField(FieldInfo fieldInfo) throws IOException {
+    public Status needsField(FieldInfo fieldInfo) {
         if (requiredFields.remove(fieldInfo.name)) {
         if (requiredFields.remove(fieldInfo.name)) {
             return Status.YES;
             return Status.YES;
         }
         }
@@ -108,42 +107,54 @@ public class FieldsVisitor extends StoredFieldVisitor {
     }
     }
 
 
     @Override
     @Override
-    public void binaryField(FieldInfo fieldInfo, byte[] value) throws IOException {
+    public void binaryField(FieldInfo fieldInfo, byte[] value) {
+        binaryField(fieldInfo, new BytesRef(value));
+    }
+
+    public void binaryField(FieldInfo fieldInfo, BytesRef value) {
         if (sourceFieldName.equals(fieldInfo.name)) {
         if (sourceFieldName.equals(fieldInfo.name)) {
             source = new BytesArray(value);
             source = new BytesArray(value);
         } else if (IdFieldMapper.NAME.equals(fieldInfo.name)) {
         } else if (IdFieldMapper.NAME.equals(fieldInfo.name)) {
-            id = Uid.decodeId(value);
+            id = Uid.decodeId(value.bytes, value.offset, value.length);
         } else {
         } else {
-            addValue(fieldInfo.name, new BytesRef(value));
+            addValue(fieldInfo.name, value);
         }
         }
     }
     }
 
 
     @Override
     @Override
-    public void stringField(FieldInfo fieldInfo, byte[] bytes) throws IOException {
+    public void stringField(FieldInfo fieldInfo, byte[] bytes) {
+        assert IdFieldMapper.NAME.equals(fieldInfo.name) == false : "_id field must go through binaryField";
+        assert sourceFieldName.equals(fieldInfo.name) == false : "source field must go through binaryField";
         final String value = new String(bytes, StandardCharsets.UTF_8);
         final String value = new String(bytes, StandardCharsets.UTF_8);
         addValue(fieldInfo.name, value);
         addValue(fieldInfo.name, value);
     }
     }
 
 
     @Override
     @Override
-    public void intField(FieldInfo fieldInfo, int value) throws IOException {
+    public void intField(FieldInfo fieldInfo, int value) {
         addValue(fieldInfo.name, value);
         addValue(fieldInfo.name, value);
     }
     }
 
 
     @Override
     @Override
-    public void longField(FieldInfo fieldInfo, long value) throws IOException {
+    public void longField(FieldInfo fieldInfo, long value) {
         addValue(fieldInfo.name, value);
         addValue(fieldInfo.name, value);
     }
     }
 
 
     @Override
     @Override
-    public void floatField(FieldInfo fieldInfo, float value) throws IOException {
+    public void floatField(FieldInfo fieldInfo, float value) {
         addValue(fieldInfo.name, value);
         addValue(fieldInfo.name, value);
     }
     }
 
 
     @Override
     @Override
-    public void doubleField(FieldInfo fieldInfo, double value) throws IOException {
+    public void doubleField(FieldInfo fieldInfo, double value) {
         addValue(fieldInfo.name, value);
         addValue(fieldInfo.name, value);
     }
     }
 
 
+    public void objectField(FieldInfo fieldInfo, Object object) {
+        assert IdFieldMapper.NAME.equals(fieldInfo.name) == false : "_id field must go through binaryField";
+        assert sourceFieldName.equals(fieldInfo.name) == false : "source field must go through binaryField";
+        addValue(fieldInfo.name, object);
+    }
+
     public BytesReference source() {
     public BytesReference source() {
         return source;
         return source;
     }
     }

+ 105 - 47
server/src/main/java/org/elasticsearch/index/get/ShardGetService.java

@@ -19,6 +19,12 @@
 
 
 package org.elasticsearch.index.get;
 package org.elasticsearch.index.get;
 
 
+import org.apache.lucene.index.DocValuesType;
+import org.apache.lucene.index.FieldInfo;
+import org.apache.lucene.index.IndexOptions;
+import org.apache.lucene.index.IndexableField;
+import org.apache.lucene.index.IndexableFieldType;
+import org.apache.lucene.index.StoredFieldVisitor;
 import org.apache.lucene.index.Term;
 import org.apache.lucene.index.Term;
 import org.elasticsearch.ElasticsearchException;
 import org.elasticsearch.ElasticsearchException;
 import org.elasticsearch.common.Nullable;
 import org.elasticsearch.common.Nullable;
@@ -37,24 +43,29 @@ import org.elasticsearch.common.xcontent.support.XContentMapValues;
 import org.elasticsearch.index.IndexSettings;
 import org.elasticsearch.index.IndexSettings;
 import org.elasticsearch.index.VersionType;
 import org.elasticsearch.index.VersionType;
 import org.elasticsearch.index.engine.Engine;
 import org.elasticsearch.index.engine.Engine;
+import org.elasticsearch.index.engine.TranslogLeafReader;
 import org.elasticsearch.index.fieldvisitor.CustomFieldsVisitor;
 import org.elasticsearch.index.fieldvisitor.CustomFieldsVisitor;
 import org.elasticsearch.index.fieldvisitor.FieldsVisitor;
 import org.elasticsearch.index.fieldvisitor.FieldsVisitor;
 import org.elasticsearch.index.mapper.DocumentMapper;
 import org.elasticsearch.index.mapper.DocumentMapper;
 import org.elasticsearch.index.mapper.IdFieldMapper;
 import org.elasticsearch.index.mapper.IdFieldMapper;
 import org.elasticsearch.index.mapper.Mapper;
 import org.elasticsearch.index.mapper.Mapper;
 import org.elasticsearch.index.mapper.MapperService;
 import org.elasticsearch.index.mapper.MapperService;
+import org.elasticsearch.index.mapper.ParsedDocument;
 import org.elasticsearch.index.mapper.RoutingFieldMapper;
 import org.elasticsearch.index.mapper.RoutingFieldMapper;
 import org.elasticsearch.index.mapper.SourceFieldMapper;
 import org.elasticsearch.index.mapper.SourceFieldMapper;
+import org.elasticsearch.index.mapper.SourceToParse;
 import org.elasticsearch.index.mapper.Uid;
 import org.elasticsearch.index.mapper.Uid;
 import org.elasticsearch.index.shard.AbstractIndexShardComponent;
 import org.elasticsearch.index.shard.AbstractIndexShardComponent;
 import org.elasticsearch.index.shard.IndexShard;
 import org.elasticsearch.index.shard.IndexShard;
 import org.elasticsearch.search.fetch.subphase.FetchSourceContext;
 import org.elasticsearch.search.fetch.subphase.FetchSourceContext;
 
 
 import java.io.IOException;
 import java.io.IOException;
+import java.util.Collections;
 import java.util.HashMap;
 import java.util.HashMap;
 import java.util.List;
 import java.util.List;
 import java.util.Map;
 import java.util.Map;
 import java.util.concurrent.TimeUnit;
 import java.util.concurrent.TimeUnit;
+import java.util.stream.Stream;
 
 
 import static org.elasticsearch.index.seqno.SequenceNumbers.UNASSIGNED_PRIMARY_TERM;
 import static org.elasticsearch.index.seqno.SequenceNumbers.UNASSIGNED_PRIMARY_TERM;
 import static org.elasticsearch.index.seqno.SequenceNumbers.UNASSIGNED_SEQ_NO;
 import static org.elasticsearch.index.seqno.SequenceNumbers.UNASSIGNED_SEQ_NO;
@@ -81,16 +92,16 @@ public final class ShardGetService extends AbstractIndexShardComponent {
     public GetResult get(String id, String[] gFields, boolean realtime, long version,
     public GetResult get(String id, String[] gFields, boolean realtime, long version,
                             VersionType versionType, FetchSourceContext fetchSourceContext) {
                             VersionType versionType, FetchSourceContext fetchSourceContext) {
         return
         return
-            get(id, gFields, realtime, version, versionType, UNASSIGNED_SEQ_NO, UNASSIGNED_PRIMARY_TERM, fetchSourceContext, false);
+            get(id, gFields, realtime, version, versionType, UNASSIGNED_SEQ_NO, UNASSIGNED_PRIMARY_TERM, fetchSourceContext);
     }
     }
 
 
     private GetResult get(String id, String[] gFields, boolean realtime, long version, VersionType versionType,
     private GetResult get(String id, String[] gFields, boolean realtime, long version, VersionType versionType,
-                          long ifSeqNo, long ifPrimaryTerm, FetchSourceContext fetchSourceContext, boolean readFromTranslog) {
+                          long ifSeqNo, long ifPrimaryTerm, FetchSourceContext fetchSourceContext) {
         currentMetric.inc();
         currentMetric.inc();
         try {
         try {
             long now = System.nanoTime();
             long now = System.nanoTime();
             GetResult getResult =
             GetResult getResult =
-                innerGet(id, gFields, realtime, version, versionType, ifSeqNo, ifPrimaryTerm, fetchSourceContext, readFromTranslog);
+                innerGet(id, gFields, realtime, version, versionType, ifSeqNo, ifPrimaryTerm, fetchSourceContext);
 
 
             if (getResult.isExists()) {
             if (getResult.isExists()) {
                 existsMetric.inc(System.nanoTime() - now);
                 existsMetric.inc(System.nanoTime() - now);
@@ -105,7 +116,7 @@ public final class ShardGetService extends AbstractIndexShardComponent {
 
 
     public GetResult getForUpdate(String id, long ifSeqNo, long ifPrimaryTerm) {
     public GetResult getForUpdate(String id, long ifSeqNo, long ifPrimaryTerm) {
         return get(id, new String[]{RoutingFieldMapper.NAME}, true,
         return get(id, new String[]{RoutingFieldMapper.NAME}, true,
-            Versions.MATCH_ANY, VersionType.INTERNAL, ifSeqNo, ifPrimaryTerm, FetchSourceContext.FETCH_SOURCE, true);
+            Versions.MATCH_ANY, VersionType.INTERNAL, ifSeqNo, ifPrimaryTerm, FetchSourceContext.FETCH_SOURCE);
     }
     }
 
 
     /**
     /**
@@ -156,13 +167,13 @@ public final class ShardGetService extends AbstractIndexShardComponent {
     }
     }
 
 
     private GetResult innerGet(String id, String[] gFields, boolean realtime, long version, VersionType versionType,
     private GetResult innerGet(String id, String[] gFields, boolean realtime, long version, VersionType versionType,
-                               long ifSeqNo, long ifPrimaryTerm, FetchSourceContext fetchSourceContext, boolean readFromTranslog) {
+                               long ifSeqNo, long ifPrimaryTerm, FetchSourceContext fetchSourceContext) {
         fetchSourceContext = normalizeFetchSourceContent(fetchSourceContext, gFields);
         fetchSourceContext = normalizeFetchSourceContent(fetchSourceContext, gFields);
 
 
         Term uidTerm = new Term(IdFieldMapper.NAME, Uid.encodeId(id));
         Term uidTerm = new Term(IdFieldMapper.NAME, Uid.encodeId(id));
-        Engine.GetResult get = indexShard.get(new Engine.Get(realtime, readFromTranslog, id, uidTerm)
+        Engine.GetResult get = indexShard.get(new Engine.Get(realtime, realtime, id, uidTerm)
             .version(version).versionType(versionType).setIfSeqNo(ifSeqNo).setIfPrimaryTerm(ifPrimaryTerm));
             .version(version).versionType(versionType).setIfSeqNo(ifSeqNo).setIfPrimaryTerm(ifPrimaryTerm));
-        assert get.isFromTranslog() == false || readFromTranslog : "should only read from translog if explicitly enabled";
+        assert get.isFromTranslog() == false || realtime : "should only read from translog if realtime enabled";
         if (get.exists() == false) {
         if (get.exists() == false) {
             get.close();
             get.close();
         }
         }
@@ -179,13 +190,33 @@ public final class ShardGetService extends AbstractIndexShardComponent {
         }
         }
     }
     }
 
 
-    private GetResult innerGetLoadFromStoredFields(String id, String[] gFields, FetchSourceContext fetchSourceContext,
-                                                        Engine.GetResult get, MapperService mapperService) {
+    private GetResult innerGetLoadFromStoredFields(String id, String[] storedFields, FetchSourceContext fetchSourceContext,
+                                                   Engine.GetResult get, MapperService mapperService) {
+        assert get.exists() : "method should only be called if document could be retrieved";
+
+        // check first if stored fields to be loaded don't contain an object field
+        DocumentMapper docMapper = mapperService.documentMapper();
+        if (storedFields != null) {
+            for (String field : storedFields) {
+                Mapper fieldMapper = docMapper.mappers().getMapper(field);
+                if (fieldMapper == null) {
+                    if (docMapper.objectMappers().get(field) != null) {
+                        // Only fail if we know it is a object field, missing paths / fields shouldn't fail.
+                        throw new IllegalArgumentException("field [" + field + "] isn't a leaf field");
+                    }
+                }
+            }
+        }
+
         Map<String, DocumentField> documentFields = null;
         Map<String, DocumentField> documentFields = null;
         Map<String, DocumentField> metaDataFields = null;
         Map<String, DocumentField> metaDataFields = null;
         BytesReference source = null;
         BytesReference source = null;
         DocIdAndVersion docIdAndVersion = get.docIdAndVersion();
         DocIdAndVersion docIdAndVersion = get.docIdAndVersion();
-        FieldsVisitor fieldVisitor = buildFieldsVisitors(gFields, fetchSourceContext);
+        // force fetching source if we read from translog and need to recreate stored fields
+        boolean forceSourceForComputingTranslogStoredFields = get.isFromTranslog() && storedFields != null &&
+                Stream.of(storedFields).anyMatch(f -> TranslogLeafReader.ALL_FIELD_NAMES.contains(f) == false);
+        FieldsVisitor fieldVisitor = buildFieldsVisitors(storedFields,
+            forceSourceForComputingTranslogStoredFields ? FetchSourceContext.FETCH_SOURCE : fetchSourceContext);
         if (fieldVisitor != null) {
         if (fieldVisitor != null) {
             try {
             try {
                 docIdAndVersion.reader.document(docIdAndVersion.docId, fieldVisitor);
                 docIdAndVersion.reader.document(docIdAndVersion.docId, fieldVisitor);
@@ -194,6 +225,54 @@ public final class ShardGetService extends AbstractIndexShardComponent {
             }
             }
             source = fieldVisitor.source();
             source = fieldVisitor.source();
 
 
+            // in case we read from translog, some extra steps are needed to make _source consistent and to load stored fields
+            if (get.isFromTranslog()) {
+                // Fast path: if only asked for the source or stored fields that have been already provided by TranslogLeafReader,
+                // just make source consistent by reapplying source filters from mapping (possibly also nulling the source)
+                if (forceSourceForComputingTranslogStoredFields == false) {
+                    try {
+                        source = indexShard.mapperService().documentMapper().sourceMapper().applyFilters(source, null);
+                    } catch (IOException e) {
+                        throw new ElasticsearchException("Failed to reapply filters for [" + id + "] after reading from translog", e);
+                    }
+                } else {
+                    // Slow path: recreate stored fields from original source
+                    assert source != null : "original source in translog must exist";
+                    SourceToParse sourceToParse = new SourceToParse(shardId.getIndexName(), id, source, XContentHelper.xContentType(source),
+                        fieldVisitor.routing());
+                    ParsedDocument doc = indexShard.mapperService().documentMapper().parse(sourceToParse);
+                    assert doc.dynamicMappingsUpdate() == null : "mapping updates should not be required on already-indexed doc";
+                    // update special fields
+                    doc.updateSeqID(docIdAndVersion.seqNo, docIdAndVersion.primaryTerm);
+                    doc.version().setLongValue(docIdAndVersion.version);
+
+                    // retrieve stored fields from parsed doc
+                    fieldVisitor = buildFieldsVisitors(storedFields, fetchSourceContext);
+                    for (IndexableField indexableField : doc.rootDoc().getFields()) {
+                        IndexableFieldType fieldType = indexableField.fieldType();
+                        if (fieldType.stored()) {
+                            FieldInfo fieldInfo = new FieldInfo(indexableField.name(), 0, false, false, false, IndexOptions.NONE,
+                                DocValuesType.NONE, -1, Collections.emptyMap(), 0, 0, 0, false);
+                            StoredFieldVisitor.Status status = fieldVisitor.needsField(fieldInfo);
+                            if (status == StoredFieldVisitor.Status.YES) {
+                                if (indexableField.binaryValue() != null) {
+                                    fieldVisitor.binaryField(fieldInfo, indexableField.binaryValue());
+                                } else if (indexableField.stringValue() != null) {
+                                    fieldVisitor.objectField(fieldInfo, indexableField.stringValue());
+                                } else if (indexableField.numericValue() != null) {
+                                    fieldVisitor.objectField(fieldInfo, indexableField.numericValue());
+                                }
+                            } else if (status == StoredFieldVisitor.Status.STOP) {
+                                break;
+                            }
+                        }
+                    }
+                    // retrieve source (with possible transformations, e.g. source filters
+                    source = fieldVisitor.source();
+                }
+            }
+
+            // put stored fields into result objects
             if (!fieldVisitor.fields().isEmpty()) {
             if (!fieldVisitor.fields().isEmpty()) {
                 fieldVisitor.postProcess(mapperService);
                 fieldVisitor.postProcess(mapperService);
                 documentFields = new HashMap<>();
                 documentFields = new HashMap<>();
@@ -208,47 +287,26 @@ public final class ShardGetService extends AbstractIndexShardComponent {
             }
             }
         }
         }
 
 
-        DocumentMapper docMapper = mapperService.documentMapper();
-
-        if (gFields != null && gFields.length > 0) {
-            for (String field : gFields) {
-                Mapper fieldMapper = docMapper.mappers().getMapper(field);
-                if (fieldMapper == null) {
-                    if (docMapper.objectMappers().get(field) != null) {
-                        // Only fail if we know it is a object field, missing paths / fields shouldn't fail.
-                        throw new IllegalArgumentException("field [" + field + "] isn't a leaf field");
-                    }
+        if (source != null) {
+            // apply request-level source filtering
+            if (fetchSourceContext.fetchSource() == false) {
+                source = null;
+            } else if (fetchSourceContext.includes().length > 0 || fetchSourceContext.excludes().length > 0) {
+                Map<String, Object> sourceAsMap;
+                // TODO: The source might be parsed and available in the sourceLookup but that one uses unordered maps so different.
+                //  Do we care?
+                Tuple<XContentType, Map<String, Object>> typeMapTuple = XContentHelper.convertToMap(source, true);
+                XContentType sourceContentType = typeMapTuple.v1();
+                sourceAsMap = typeMapTuple.v2();
+                sourceAsMap = XContentMapValues.filter(sourceAsMap, fetchSourceContext.includes(), fetchSourceContext.excludes());
+                try {
+                    source = BytesReference.bytes(XContentFactory.contentBuilder(sourceContentType).map(sourceAsMap));
+                } catch (IOException e) {
+                    throw new ElasticsearchException("Failed to get id [" + id + "] with includes/excludes set", e);
                 }
                 }
             }
             }
         }
         }
 
 
-        if (!fetchSourceContext.fetchSource()) {
-            source = null;
-        }
-
-        if (source != null && get.isFromTranslog()) {
-            // reapply source filters from mapping (possibly also nulling the source)
-            try {
-                source = docMapper.sourceMapper().applyFilters(source, null);
-            } catch (IOException e) {
-                throw new ElasticsearchException("Failed to reapply filters for [" + id + "] after reading from translog", e);
-            }
-        }
-
-        if (source != null && (fetchSourceContext.includes().length > 0 || fetchSourceContext.excludes().length > 0)) {
-            Map<String, Object> sourceAsMap;
-            // TODO: The source might parsed and available in the sourceLookup but that one uses unordered maps so different. Do we care?
-            Tuple<XContentType, Map<String, Object>> typeMapTuple = XContentHelper.convertToMap(source, true);
-            XContentType sourceContentType = typeMapTuple.v1();
-            sourceAsMap = typeMapTuple.v2();
-            sourceAsMap = XContentMapValues.filter(sourceAsMap, fetchSourceContext.includes(), fetchSourceContext.excludes());
-            try {
-                source = BytesReference.bytes(XContentFactory.contentBuilder(sourceContentType).map(sourceAsMap));
-            } catch (IOException e) {
-                throw new ElasticsearchException("Failed to get id [" + id + "] with includes/excludes set", e);
-            }
-        }
-
         return new GetResult(shardId.getIndexName(), id, get.docIdAndVersion().seqNo, get.docIdAndVersion().primaryTerm,
         return new GetResult(shardId.getIndexName(), id, get.docIdAndVersion().seqNo, get.docIdAndVersion().primaryTerm,
             get.version(), get.exists(), source, documentFields, metaDataFields);
             get.version(), get.exists(), source, documentFields, metaDataFields);
     }
     }

+ 42 - 2
server/src/test/java/org/elasticsearch/index/shard/ShardGetServiceTests.java

@@ -22,10 +22,12 @@ import org.elasticsearch.Version;
 import org.elasticsearch.cluster.metadata.IndexMetaData;
 import org.elasticsearch.cluster.metadata.IndexMetaData;
 import org.elasticsearch.common.settings.Settings;
 import org.elasticsearch.common.settings.Settings;
 import org.elasticsearch.common.xcontent.XContentType;
 import org.elasticsearch.common.xcontent.XContentType;
+import org.elasticsearch.index.VersionType;
 import org.elasticsearch.index.engine.Engine;
 import org.elasticsearch.index.engine.Engine;
 import org.elasticsearch.index.engine.VersionConflictEngineException;
 import org.elasticsearch.index.engine.VersionConflictEngineException;
 import org.elasticsearch.index.get.GetResult;
 import org.elasticsearch.index.get.GetResult;
 import org.elasticsearch.index.mapper.RoutingFieldMapper;
 import org.elasticsearch.index.mapper.RoutingFieldMapper;
+import org.elasticsearch.search.fetch.subphase.FetchSourceContext;
 
 
 import java.io.IOException;
 import java.io.IOException;
 import java.nio.charset.StandardCharsets;
 import java.nio.charset.StandardCharsets;
@@ -89,7 +91,7 @@ public class ShardGetServiceTests extends IndexShardTestCase {
         closeShards(primary);
         closeShards(primary);
     }
     }
 
 
-    public void testGetFromTranslogWithSourceMappingOptions() throws IOException {
+    public void testGetFromTranslogWithSourceMappingOptionsAndStoredFields() throws IOException {
         Settings settings = Settings.builder().put(IndexMetaData.SETTING_VERSION_CREATED, Version.CURRENT)
         Settings settings = Settings.builder().put(IndexMetaData.SETTING_VERSION_CREATED, Version.CURRENT)
             .put(IndexMetaData.SETTING_NUMBER_OF_REPLICAS, 1)
             .put(IndexMetaData.SETTING_NUMBER_OF_REPLICAS, 1)
             .put(IndexMetaData.SETTING_NUMBER_OF_SHARDS, 1)
             .put(IndexMetaData.SETTING_NUMBER_OF_SHARDS, 1)
@@ -99,7 +101,8 @@ public class ShardGetServiceTests extends IndexShardTestCase {
         String sourceOptions = noSource ? "\"enabled\": false" : randomBoolean() ? "\"excludes\": [\"fo*\"]" : "\"includes\": [\"ba*\"]";
         String sourceOptions = noSource ? "\"enabled\": false" : randomBoolean() ? "\"excludes\": [\"fo*\"]" : "\"includes\": [\"ba*\"]";
         String expectedResult = noSource ? "" : "{\"bar\":\"bar\"}";
         String expectedResult = noSource ? "" : "{\"bar\":\"bar\"}";
         IndexMetaData metaData = IndexMetaData.builder("test")
         IndexMetaData metaData = IndexMetaData.builder("test")
-            .putMapping("test", "{ \"properties\": { \"foo\":  { \"type\": \"text\"}, \"bar\":  { \"type\": \"text\"}}, \"_source\": { "
+            .putMapping("test", "{ \"properties\": { \"foo\":  { \"type\": \"text\", \"store\": true }, " +
+                "\"bar\":  { \"type\": \"text\"}}, \"_source\": { "
                 + sourceOptions + "}}}")
                 + sourceOptions + "}}}")
             .settings(settings)
             .settings(settings)
             .primaryTerm(0, 1).build();
             .primaryTerm(0, 1).build();
@@ -114,6 +117,43 @@ public class ShardGetServiceTests extends IndexShardTestCase {
             assertEquals(searcher.getIndexReader().maxDoc(), 1); // we refreshed
             assertEquals(searcher.getIndexReader().maxDoc(), 1); // we refreshed
         }
         }
 
 
+        Engine.IndexResult test1 = indexDoc(primary, "test", "1", docToIndex,  XContentType.JSON, "foobar");
+        assertTrue(primary.getEngine().refreshNeeded());
+        GetResult testGet1 = primary.getService().getForUpdate("1", UNASSIGNED_SEQ_NO, UNASSIGNED_PRIMARY_TERM);
+        assertEquals(new String(testGet1.source() == null ? new byte[0] : testGet1.source(), StandardCharsets.UTF_8), expectedResult);
+        assertTrue(testGet1.getFields().containsKey(RoutingFieldMapper.NAME));
+        assertEquals("foobar", testGet1.getFields().get(RoutingFieldMapper.NAME).getValue());
+        try (Engine.Searcher searcher = primary.getEngine().acquireSearcher("test", Engine.SearcherScope.INTERNAL)) {
+            assertEquals(searcher.getIndexReader().maxDoc(), 1); // we read from the translog
+        }
+        primary.getEngine().refresh("test");
+        try (Engine.Searcher searcher = primary.getEngine().acquireSearcher("test", Engine.SearcherScope.INTERNAL)) {
+            assertEquals(searcher.getIndexReader().maxDoc(), 2);
+        }
+
+        Engine.IndexResult test2 = indexDoc(primary, "test", "2", docToIndex,  XContentType.JSON, "foobar");
+        assertTrue(primary.getEngine().refreshNeeded());
+        GetResult testGet2 = primary.getService().get("2", new String[]{"foo"}, true, 1, VersionType.INTERNAL,
+            FetchSourceContext.FETCH_SOURCE);
+        assertEquals(new String(testGet2.source() == null ? new byte[0] : testGet2.source(), StandardCharsets.UTF_8), expectedResult);
+        assertTrue(testGet2.getFields().containsKey(RoutingFieldMapper.NAME));
+        assertTrue(testGet2.getFields().containsKey("foo"));
+        assertEquals("foo", testGet2.getFields().get("foo").getValue());
+        try (Engine.Searcher searcher = primary.getEngine().acquireSearcher("test", Engine.SearcherScope.INTERNAL)) {
+            assertEquals(searcher.getIndexReader().maxDoc(), 2); // we read from the translog
+        }
+        primary.getEngine().refresh("test");
+        try (Engine.Searcher searcher = primary.getEngine().acquireSearcher("test", Engine.SearcherScope.INTERNAL)) {
+            assertEquals(searcher.getIndexReader().maxDoc(), 3);
+        }
+
+        testGet2 = primary.getService().get("2", new String[]{"foo"}, true, 1, VersionType.INTERNAL,
+            FetchSourceContext.FETCH_SOURCE);
+        assertEquals(new String(testGet2.source() == null ? new byte[0] : testGet2.source(), StandardCharsets.UTF_8), expectedResult);
+        assertTrue(testGet2.getFields().containsKey(RoutingFieldMapper.NAME));
+        assertTrue(testGet2.getFields().containsKey("foo"));
+        assertEquals("foo", testGet2.getFields().get("foo").getValue());
+
         closeShards(primary);
         closeShards(primary);
     }
     }
 
 

+ 1 - 1
test/framework/src/main/java/org/elasticsearch/index/engine/EngineTestCase.java

@@ -750,7 +750,7 @@ public abstract class EngineTestCase extends ESTestCase {
     }
     }
 
 
     protected Engine.Get newGet(boolean realtime, ParsedDocument doc) {
     protected Engine.Get newGet(boolean realtime, ParsedDocument doc) {
-        return new Engine.Get(realtime, false, doc.id(), newUid(doc));
+        return new Engine.Get(realtime, realtime, doc.id(), newUid(doc));
     }
     }
 
 
     protected Engine.Index indexForDoc(ParsedDocument doc) {
     protected Engine.Index indexForDoc(ParsedDocument doc) {