Browse Source

Encapsulate source filtering (#91127)

We have two implementations of source filtering, one based on Map filtering
and used by SourceLookup, and one based on jackson stream filtering used
in Get and SourceFieldMapper. There are cases when stream filtering could
be usefully applied to source in the fetch phase, for example if the source is
not being used as a Map by any other subphase; and correspondingly if a
source has already been parsed to a Map then map filtering will generally
be more efficient than stream filtering that ends up re-parsing the bytes.

This commit encapsulates all of this filtering logic into a single SourceFilter
class, which can be passed to the filter method on Source. Different
Source implementations can choose to use map or stream filtering
depending on whether or not they have map or bytes representations
available.
Alan Woodward 3 years ago
parent
commit
ba7a219ac0
18 changed files with 450 additions and 224 deletions
  1. 8 2
      benchmarks/src/main/java/org/elasticsearch/benchmark/search/fetch/subphase/FetchSourcePhaseBenchmark.java
  2. 94 0
      benchmarks/src/main/java/org/elasticsearch/benchmark/search/fetch/subphase/SourceFilteringBenchmark.java
  3. 3 3
      modules/percolator/src/test/java/org/elasticsearch/percolator/PercolatorMatchedSlotSubFetchPhaseTests.java
  4. 4 18
      server/src/main/java/org/elasticsearch/action/update/UpdateHelper.java
  5. 0 101
      server/src/main/java/org/elasticsearch/common/xcontent/XContentFieldFilter.java
  6. 2 2
      server/src/main/java/org/elasticsearch/common/xcontent/support/XContentMapValues.java
  7. 3 8
      server/src/main/java/org/elasticsearch/index/get/ShardGetService.java
  8. 19 11
      server/src/main/java/org/elasticsearch/index/mapper/SourceFieldMapper.java
  9. 1 1
      server/src/main/java/org/elasticsearch/search/fetch/FetchPhase.java
  10. 9 12
      server/src/main/java/org/elasticsearch/search/fetch/subphase/FetchSourceContext.java
  11. 15 15
      server/src/main/java/org/elasticsearch/search/fetch/subphase/FetchSourcePhase.java
  12. 44 13
      server/src/main/java/org/elasticsearch/search/lookup/Source.java
  13. 108 0
      server/src/main/java/org/elasticsearch/search/lookup/SourceFilter.java
  14. 5 0
      server/src/main/java/org/elasticsearch/search/lookup/SourceLookup.java
  15. 19 36
      server/src/test/java/org/elasticsearch/common/xcontent/support/XContentSourceFilterTests.java
  16. 1 1
      server/src/test/java/org/elasticsearch/search/fetch/subphase/FetchFieldsPhaseTests.java
  17. 1 1
      server/src/test/java/org/elasticsearch/search/fetch/subphase/FetchSourcePhaseTests.java
  18. 114 0
      server/src/test/java/org/elasticsearch/search/lookup/SourceFilterTests.java

+ 8 - 2
benchmarks/src/main/java/org/elasticsearch/benchmark/search/fetch/subphase/FetchSourcePhaseBenchmark.java

@@ -77,9 +77,15 @@ public class FetchSourcePhaseBenchmark {
     }
 
     @Benchmark
-    public BytesReference filterObjects() {
+    public BytesReference filterSourceMap() {
         Source bytesSource = Source.fromBytes(sourceBytes);
-        return Source.fromMap(bytesSource.filter(fetchContext), bytesSource.sourceContentType()).internalSourceRef();
+        return fetchContext.filter().filterMap(bytesSource).internalSourceRef();
+    }
+
+    @Benchmark
+    public BytesReference filterSourceBytes() {
+        Source bytesSource = Source.fromBytes(sourceBytes);
+        return fetchContext.filter().filterBytes(bytesSource).internalSourceRef();
     }
 
     @Benchmark

+ 94 - 0
benchmarks/src/main/java/org/elasticsearch/benchmark/search/fetch/subphase/SourceFilteringBenchmark.java

@@ -0,0 +1,94 @@
+/*
+ * Copyright Elasticsearch B.V. and/or licensed to Elasticsearch B.V. under one
+ * or more contributor license agreements. Licensed under the Elastic License
+ * 2.0 and the Server Side Public License, v 1; you may not use this file except
+ * in compliance with, at your election, the Elastic License 2.0 or the Server
+ * Side Public License, v 1.
+ */
+
+package org.elasticsearch.benchmark.search.fetch.subphase;
+
+import org.elasticsearch.common.Strings;
+import org.elasticsearch.common.bytes.BytesArray;
+import org.elasticsearch.common.bytes.BytesReference;
+import org.elasticsearch.common.io.Streams;
+import org.elasticsearch.search.fetch.subphase.FetchSourceContext;
+import org.elasticsearch.search.lookup.Source;
+import org.elasticsearch.search.lookup.SourceFilter;
+import org.openjdk.jmh.annotations.Benchmark;
+import org.openjdk.jmh.annotations.BenchmarkMode;
+import org.openjdk.jmh.annotations.Fork;
+import org.openjdk.jmh.annotations.Measurement;
+import org.openjdk.jmh.annotations.Mode;
+import org.openjdk.jmh.annotations.OutputTimeUnit;
+import org.openjdk.jmh.annotations.Param;
+import org.openjdk.jmh.annotations.Scope;
+import org.openjdk.jmh.annotations.Setup;
+import org.openjdk.jmh.annotations.State;
+import org.openjdk.jmh.annotations.Warmup;
+
+import java.io.IOException;
+import java.util.concurrent.TimeUnit;
+
+@Fork(1)
+@Warmup(iterations = 5)
+@Measurement(iterations = 5)
+@BenchmarkMode(Mode.AverageTime)
+@OutputTimeUnit(TimeUnit.NANOSECONDS)
+@State(Scope.Benchmark)
+public class SourceFilteringBenchmark {
+
+    private BytesReference sourceBytes;
+    private SourceFilter filter;
+
+    @Param({ "tiny", "short", "one_4k_field", "one_4m_field" })
+    private String source;
+    @Param({ "message" })
+    private String includes;
+    @Param({ "" })
+    private String excludes;
+
+    @Setup
+    public void setup() throws IOException {
+        sourceBytes = switch (source) {
+            case "tiny" -> new BytesArray("{\"message\": \"short\"}");
+            case "short" -> read300BytesExample();
+            case "one_4k_field" -> buildBigExample("huge".repeat(1024));
+            case "one_4m_field" -> buildBigExample("huge".repeat(1024 * 1024));
+            default -> throw new IllegalArgumentException("Unknown source [" + source + "]");
+        };
+        FetchSourceContext fetchContext = FetchSourceContext.of(
+            true,
+            Strings.splitStringByCommaToArray(includes),
+            Strings.splitStringByCommaToArray(excludes)
+        );
+        filter = fetchContext.filter();
+    }
+
+    private BytesReference read300BytesExample() throws IOException {
+        return Streams.readFully(FetchSourcePhaseBenchmark.class.getResourceAsStream("300b_example.json"));
+    }
+
+    private BytesReference buildBigExample(String extraText) throws IOException {
+        String bigger = read300BytesExample().utf8ToString();
+        bigger = "{\"huge\": \"" + extraText + "\"," + bigger.substring(1);
+        return new BytesArray(bigger);
+    }
+
+    // We want to compare map filtering with bytes filtering when the map has already
+    // been parsed.
+
+    @Benchmark
+    public Source filterMap() {
+        Source source = Source.fromBytes(sourceBytes);
+        source.source();    // build map
+        return filter.filterMap(source);
+    }
+
+    @Benchmark
+    public Source filterBytes() {
+        Source source = Source.fromBytes(sourceBytes);
+        source.source();    // build map
+        return filter.filterBytes(source);
+    }
+}

+ 3 - 3
modules/percolator/src/test/java/org/elasticsearch/percolator/PercolatorMatchedSlotSubFetchPhaseTests.java

@@ -55,7 +55,7 @@ public class PercolatorMatchedSlotSubFetchPhaseTests extends ESTestCase {
                 LeafReaderContext context = reader.leaves().get(0);
                 // A match:
                 {
-                    HitContext hit = new HitContext(new SearchHit(0), context, 0, Source.EMPTY);
+                    HitContext hit = new HitContext(new SearchHit(0), context, 0, Source.empty(null));
                     PercolateQuery.QueryStore queryStore = ctx -> docId -> new TermQuery(new Term("field", "value"));
                     MemoryIndex memoryIndex = new MemoryIndex();
                     memoryIndex.addField("field", "value", new WhitespaceAnalyzer());
@@ -86,7 +86,7 @@ public class PercolatorMatchedSlotSubFetchPhaseTests extends ESTestCase {
 
                 // No match:
                 {
-                    HitContext hit = new HitContext(new SearchHit(0), context, 0, Source.EMPTY);
+                    HitContext hit = new HitContext(new SearchHit(0), context, 0, Source.empty(null));
                     PercolateQuery.QueryStore queryStore = ctx -> docId -> new TermQuery(new Term("field", "value"));
                     MemoryIndex memoryIndex = new MemoryIndex();
                     memoryIndex.addField("field", "value1", new WhitespaceAnalyzer());
@@ -116,7 +116,7 @@ public class PercolatorMatchedSlotSubFetchPhaseTests extends ESTestCase {
 
                 // No query:
                 {
-                    HitContext hit = new HitContext(new SearchHit(0), context, 0, Source.EMPTY);
+                    HitContext hit = new HitContext(new SearchHit(0), context, 0, Source.empty(null));
                     PercolateQuery.QueryStore queryStore = ctx -> docId -> null;
                     MemoryIndex memoryIndex = new MemoryIndex();
                     memoryIndex.addField("field", "value", new WhitespaceAnalyzer());

+ 4 - 18
server/src/main/java/org/elasticsearch/action/update/UpdateHelper.java

@@ -10,13 +10,11 @@ package org.elasticsearch.action.update;
 
 import org.apache.logging.log4j.LogManager;
 import org.apache.logging.log4j.Logger;
-import org.elasticsearch.ElasticsearchException;
 import org.elasticsearch.action.DocWriteResponse;
 import org.elasticsearch.action.delete.DeleteRequest;
 import org.elasticsearch.action.index.IndexRequest;
 import org.elasticsearch.client.internal.Requests;
 import org.elasticsearch.common.bytes.BytesReference;
-import org.elasticsearch.common.io.stream.BytesStreamOutput;
 import org.elasticsearch.common.io.stream.Writeable;
 import org.elasticsearch.common.xcontent.XContentHelper;
 import org.elasticsearch.core.Nullable;
@@ -34,8 +32,7 @@ import org.elasticsearch.script.ScriptService;
 import org.elasticsearch.script.UpdateCtxMap;
 import org.elasticsearch.script.UpdateScript;
 import org.elasticsearch.script.UpsertCtxMap;
-import org.elasticsearch.search.lookup.SourceLookup;
-import org.elasticsearch.xcontent.XContentBuilder;
+import org.elasticsearch.search.lookup.Source;
 import org.elasticsearch.xcontent.XContentType;
 
 import java.io.IOException;
@@ -315,6 +312,7 @@ public class UpdateHelper {
 
     /**
      * Applies {@link UpdateRequest#fetchSource()} to the _source of the updated document to be returned in a update response.
+     * // TODO can we pass a Source here rather than Map, XcontentType and BytesReference?
      */
     public static GetResult extractGetResult(
         final UpdateRequest request,
@@ -329,21 +327,9 @@ public class UpdateHelper {
         if (request.fetchSource() == null || request.fetchSource().fetchSource() == false) {
             return null;
         }
-
         BytesReference sourceFilteredAsBytes = sourceAsBytes;
-        if (request.fetchSource().includes().length > 0 || request.fetchSource().excludes().length > 0) {
-            SourceLookup sourceLookup = new SourceLookup(new SourceLookup.MapSourceProvider(source));
-            Object value = sourceLookup.filter(request.fetchSource());
-            try {
-                final int initialCapacity = sourceAsBytes != null ? Math.min(1024, sourceAsBytes.length()) : 1024;
-                BytesStreamOutput streamOutput = new BytesStreamOutput(initialCapacity);
-                try (XContentBuilder builder = new XContentBuilder(sourceContentType.xContent(), streamOutput)) {
-                    builder.value(value);
-                    sourceFilteredAsBytes = BytesReference.bytes(builder);
-                }
-            } catch (IOException e) {
-                throw new ElasticsearchException("Error filtering source", e);
-            }
+        if (request.fetchSource().hasFilter()) {
+            sourceFilteredAsBytes = Source.fromMap(source, sourceContentType).filter(request.fetchSource().filter()).internalSourceRef();
         }
 
         // TODO when using delete/none, we can still return the source as bytes by generating it (using the sourceContentType)

+ 0 - 101
server/src/main/java/org/elasticsearch/common/xcontent/XContentFieldFilter.java

@@ -1,101 +0,0 @@
-/*
- * Copyright Elasticsearch B.V. and/or licensed to Elasticsearch B.V. under one
- * or more contributor license agreements. Licensed under the Elastic License
- * 2.0 and the Server Side Public License, v 1; you may not use this file except
- * in compliance with, at your election, the Elastic License 2.0 or the Server
- * Side Public License, v 1.
- */
-
-package org.elasticsearch.common.xcontent;
-
-import org.elasticsearch.common.bytes.BytesReference;
-import org.elasticsearch.common.io.stream.BytesStreamOutput;
-import org.elasticsearch.common.util.CollectionUtils;
-import org.elasticsearch.common.xcontent.support.XContentMapValues;
-import org.elasticsearch.core.CheckedFunction;
-import org.elasticsearch.core.Nullable;
-import org.elasticsearch.core.Tuple;
-import org.elasticsearch.xcontent.XContentBuilder;
-import org.elasticsearch.xcontent.XContentFactory;
-import org.elasticsearch.xcontent.XContentParser;
-import org.elasticsearch.xcontent.XContentParserConfiguration;
-import org.elasticsearch.xcontent.XContentType;
-
-import java.io.IOException;
-import java.util.Arrays;
-import java.util.Collections;
-import java.util.Map;
-import java.util.Set;
-import java.util.function.Function;
-
-/**
- * A filter that filter fields away from source
- */
-public interface XContentFieldFilter {
-    /**
-     * filter source in {@link BytesReference} format and in {@link XContentType} content type
-     * note that xContentType may be null in some case, we should guess xContentType from sourceBytes in such cases
-     */
-    BytesReference apply(BytesReference sourceBytes, @Nullable XContentType xContentType) throws IOException;
-
-    /**
-     * Construct {@link XContentFieldFilter} using given includes and excludes
-     *
-     * @param includes fields to keep, wildcard supported
-     * @param excludes fields to remove, wildcard supported
-     * @return filter that filter {@link org.elasticsearch.xcontent.XContent} with given includes and excludes
-     */
-    static XContentFieldFilter newFieldFilter(String[] includes, String[] excludes) {
-        final CheckedFunction<XContentType, BytesReference, IOException> emptyValueSupplier = xContentType -> {
-            BytesStreamOutput bStream = new BytesStreamOutput();
-            XContentBuilder builder = XContentFactory.contentBuilder(xContentType, bStream).map(Collections.emptyMap());
-            builder.close();
-            return bStream.bytes();
-        };
-        // Use the old map-based filtering mechanism if there are wildcards in the excludes.
-        // TODO: Remove this if block once: https://github.com/elastic/elasticsearch/pull/80160 is merged
-        if ((CollectionUtils.isEmpty(excludes) == false) && Arrays.stream(excludes).filter(field -> field.contains("*")).count() > 0) {
-            return (originalSource, contentType) -> {
-                if (originalSource == null || originalSource.length() <= 0) {
-                    if (contentType == null) {
-                        throw new IllegalStateException("originalSource and contentType can not be null at the same time");
-                    }
-                    return emptyValueSupplier.apply(contentType);
-                }
-                Function<Map<String, ?>, Map<String, Object>> mapFilter = XContentMapValues.filter(includes, excludes);
-                Tuple<XContentType, Map<String, Object>> mapTuple = XContentHelper.convertToMap(originalSource, true, contentType);
-                Map<String, Object> filteredSource = mapFilter.apply(mapTuple.v2());
-                BytesStreamOutput bStream = new BytesStreamOutput();
-                XContentType actualContentType = mapTuple.v1();
-                XContentBuilder builder = XContentFactory.contentBuilder(actualContentType, bStream).map(filteredSource);
-                builder.close();
-                return bStream.bytes();
-            };
-        } else {
-            final XContentParserConfiguration parserConfig = XContentParserConfiguration.EMPTY.withFiltering(
-                Set.copyOf(Arrays.asList(includes)),
-                Set.copyOf(Arrays.asList(excludes)),
-                true
-            );
-            return (originalSource, contentType) -> {
-                if (originalSource == null || originalSource.length() <= 0) {
-                    if (contentType == null) {
-                        throw new IllegalStateException("originalSource and contentType can not be null at the same time");
-                    }
-                    return emptyValueSupplier.apply(contentType);
-                }
-                if (contentType == null) {
-                    contentType = XContentHelper.xContentTypeMayCompressed(originalSource);
-                }
-                BytesStreamOutput streamOutput = new BytesStreamOutput(Math.min(1024, originalSource.length()));
-                XContentBuilder builder = new XContentBuilder(contentType.xContent(), streamOutput);
-                XContentParser parser = contentType.xContent().createParser(parserConfig, originalSource.streamInput());
-                if ((parser.currentToken() == null) && (parser.nextToken() == null)) {
-                    return emptyValueSupplier.apply(contentType);
-                }
-                builder.copyCurrentStructure(parser);
-                return BytesReference.bytes(builder);
-            };
-        }
-    }
-}

+ 2 - 2
server/src/main/java/org/elasticsearch/common/xcontent/support/XContentMapValues.java

@@ -252,7 +252,7 @@ public class XContentMapValues {
      * document contains {@code a.b} as a property and {@code a} is an include,
      * then {@code a.b} will be kept in the filtered map.
      */
-    public static Map<String, Object> filter(Map<String, ?> map, String[] includes, String[] excludes) {
+    public static Map<String, Object> filter(Map<String, Object> map, String[] includes, String[] excludes) {
         return filter(includes, excludes).apply(map);
     }
 
@@ -260,7 +260,7 @@ public class XContentMapValues {
      * Returns a function that filters a document map based on the given include and exclude rules.
      * @see #filter(Map, String[], String[]) for details
      */
-    public static Function<Map<String, ?>, Map<String, Object>> filter(String[] includes, String[] excludes) {
+    public static Function<Map<String, Object>, Map<String, Object>> filter(String[] includes, String[] excludes) {
         CharacterRunAutomaton matchAllAutomaton = new CharacterRunAutomaton(Automata.makeAnyString());
 
         CharacterRunAutomaton include;

+ 3 - 8
server/src/main/java/org/elasticsearch/index/get/ShardGetService.java

@@ -15,7 +15,6 @@ import org.elasticsearch.common.lucene.uid.Versions;
 import org.elasticsearch.common.lucene.uid.VersionsAndSeqNoResolver.DocIdAndVersion;
 import org.elasticsearch.common.metrics.CounterMetric;
 import org.elasticsearch.common.metrics.MeanMetric;
-import org.elasticsearch.common.xcontent.XContentFieldFilter;
 import org.elasticsearch.core.Nullable;
 import org.elasticsearch.index.IndexSettings;
 import org.elasticsearch.index.VersionType;
@@ -32,6 +31,7 @@ import org.elasticsearch.index.shard.AbstractIndexShardComponent;
 import org.elasticsearch.index.shard.IndexShard;
 import org.elasticsearch.search.fetch.FetchPhase;
 import org.elasticsearch.search.fetch.subphase.FetchSourceContext;
+import org.elasticsearch.search.lookup.Source;
 
 import java.io.IOException;
 import java.util.Collections;
@@ -282,13 +282,8 @@ public final class ShardGetService extends AbstractIndexShardComponent {
             // apply request-level source filtering
             if (fetchSourceContext.fetchSource() == false) {
                 source = null;
-            } else if (fetchSourceContext.includes().length > 0 || fetchSourceContext.excludes().length > 0) {
-                try {
-                    source = XContentFieldFilter.newFieldFilter(fetchSourceContext.includes(), fetchSourceContext.excludes())
-                        .apply(source, null);
-                } catch (IOException e) {
-                    throw new ElasticsearchException("Failed to get id [" + id + "] with includes/excludes set", e);
-                }
+            } else if (fetchSourceContext.hasFilter()) {
+                source = Source.fromBytes(source).filter(fetchSourceContext.filter()).internalSourceRef();
             }
         }
 

+ 19 - 11
server/src/main/java/org/elasticsearch/index/mapper/SourceFieldMapper.java

@@ -18,10 +18,11 @@ import org.elasticsearch.common.Explicit;
 import org.elasticsearch.common.Strings;
 import org.elasticsearch.common.bytes.BytesReference;
 import org.elasticsearch.common.util.CollectionUtils;
-import org.elasticsearch.common.xcontent.XContentFieldFilter;
 import org.elasticsearch.core.Nullable;
 import org.elasticsearch.index.query.QueryShardException;
 import org.elasticsearch.index.query.SearchExecutionContext;
+import org.elasticsearch.search.lookup.Source;
+import org.elasticsearch.search.lookup.SourceFilter;
 import org.elasticsearch.xcontent.XContentType;
 
 import java.io.IOException;
@@ -35,7 +36,6 @@ public class SourceFieldMapper extends MetadataFieldMapper {
     public static final String RECOVERY_SOURCE_NAME = "_recovery_source";
 
     public static final String CONTENT_TYPE = "_source";
-    private final XContentFieldFilter filter;
 
     /** The source mode */
     private enum Mode {
@@ -171,22 +171,27 @@ public class SourceFieldMapper extends MetadataFieldMapper {
 
     private final String[] includes;
     private final String[] excludes;
+    private final SourceFilter sourceFilter;
 
     private SourceFieldMapper(Mode mode, Explicit<Boolean> enabled, String[] includes, String[] excludes) {
         super(new SourceFieldType((enabled.explicit() && enabled.value()) || (enabled.explicit() == false && mode != Mode.DISABLED)));
         assert enabled.explicit() == false || mode == null;
         this.mode = mode;
         this.enabled = enabled;
+        this.sourceFilter = buildSourceFilter(includes, excludes);
         this.includes = includes;
         this.excludes = excludes;
-        final boolean filtered = CollectionUtils.isEmpty(includes) == false || CollectionUtils.isEmpty(excludes) == false;
-        if (filtered && mode == Mode.SYNTHETIC) {
+        if (this.sourceFilter != null && mode == Mode.SYNTHETIC) {
             throw new IllegalArgumentException("filtering the stored _source is incompatible with synthetic source");
         }
-        this.filter = stored() && filtered
-            ? XContentFieldFilter.newFieldFilter(includes, excludes)
-            : (sourceBytes, contentType) -> sourceBytes;
-        this.complete = stored() && CollectionUtils.isEmpty(includes) && CollectionUtils.isEmpty(excludes);
+        this.complete = stored() && sourceFilter == null;
+    }
+
+    private static SourceFilter buildSourceFilter(String[] includes, String[] excludes) {
+        if (CollectionUtils.isEmpty(includes) && CollectionUtils.isEmpty(excludes)) {
+            return null;
+        }
+        return new SourceFilter(includes, excludes);
     }
 
     private boolean stored() {
@@ -231,11 +236,14 @@ public class SourceFieldMapper extends MetadataFieldMapper {
 
     @Nullable
     public BytesReference applyFilters(@Nullable BytesReference originalSource, @Nullable XContentType contentType) throws IOException {
-        if (stored() && originalSource != null) {
+        if (stored() == false) {
+            return null;
+        }
+        if (originalSource != null && sourceFilter != null) {
             // Percolate and tv APIs may not set the source and that is ok, because these APIs will not index any data
-            return filter.apply(originalSource, contentType);
+            return Source.fromBytes(originalSource, contentType).filter(sourceFilter).internalSourceRef();
         } else {
-            return null;
+            return originalSource;
         }
     }
 

+ 1 - 1
server/src/main/java/org/elasticsearch/search/fetch/FetchPhase.java

@@ -429,7 +429,7 @@ public class FetchPhase {
             }
             return new HitContext(hit, subReaderContext, nestedInfo.doc(), Source.fromMap(nestedSourceAsMap, rootSourceContentType));
         }
-        return new HitContext(hit, subReaderContext, nestedInfo.doc(), Source.EMPTY);
+        return new HitContext(hit, subReaderContext, nestedInfo.doc(), Source.empty(rootSourceContentType));
     }
 
     public static List<Object> processStoredField(Function<String, MappedFieldType> fieldTypeLookup, String field, List<Object> input) {

+ 9 - 12
server/src/main/java/org/elasticsearch/search/fetch/subphase/FetchSourceContext.java

@@ -13,10 +13,10 @@ import org.elasticsearch.common.Strings;
 import org.elasticsearch.common.io.stream.StreamInput;
 import org.elasticsearch.common.io.stream.StreamOutput;
 import org.elasticsearch.common.io.stream.Writeable;
-import org.elasticsearch.common.xcontent.support.XContentMapValues;
 import org.elasticsearch.core.Booleans;
 import org.elasticsearch.core.Nullable;
 import org.elasticsearch.rest.RestRequest;
+import org.elasticsearch.search.lookup.SourceFilter;
 import org.elasticsearch.xcontent.ParseField;
 import org.elasticsearch.xcontent.ToXContentObject;
 import org.elasticsearch.xcontent.XContentBuilder;
@@ -87,6 +87,14 @@ public class FetchSourceContext implements Writeable, ToXContentObject {
         return this.excludes;
     }
 
+    public boolean hasFilter() {
+        return this.includes.length > 0 || this.excludes.length > 0;
+    }
+
+    public SourceFilter filter() {
+        return new SourceFilter(includes, excludes);
+    }
+
     public static FetchSourceContext parseFromRestRequest(RestRequest request) {
         Boolean fetchSource = null;
         String[] sourceExcludes = null;
@@ -253,15 +261,4 @@ public class FetchSourceContext implements Writeable, ToXContentObject {
         result = 31 * result + Arrays.hashCode(excludes);
         return result;
     }
-
-    /**
-     * Returns a filter function that expects the source map as an input and returns
-     * the filtered map.
-     */
-    public Function<Map<String, ?>, Map<String, Object>> getFilter() {
-        if (filter == null) {
-            filter = XContentMapValues.filter(includes, excludes);
-        }
-        return filter;
-    }
 }

+ 15 - 15
server/src/main/java/org/elasticsearch/search/fetch/subphase/FetchSourcePhase.java

@@ -14,6 +14,7 @@ import org.elasticsearch.search.fetch.FetchContext;
 import org.elasticsearch.search.fetch.FetchSubPhase;
 import org.elasticsearch.search.fetch.FetchSubPhaseProcessor;
 import org.elasticsearch.search.lookup.Source;
+import org.elasticsearch.search.lookup.SourceFilter;
 
 import java.util.Map;
 
@@ -26,6 +27,7 @@ public final class FetchSourcePhase implements FetchSubPhase {
         }
         String index = fetchContext.getIndexName();
         assert fetchSourceContext.fetchSource();
+        SourceFilter sourceFilter = fetchSourceContext.filter();
 
         return new FetchSubPhaseProcessor() {
             private int fastPath;
@@ -38,7 +40,7 @@ public final class FetchSourcePhase implements FetchSubPhase {
             @Override
             public void process(HitContext hitContext) {
                 if (fetchContext.getSearchExecutionContext().isSourceEnabled() == false) {
-                    if (containsFilters(fetchSourceContext)) {
+                    if (fetchSourceContext.hasFilter()) {
                         throw new IllegalArgumentException(
                             "unable to fetch fields from _source field: _source is disabled in the mappings for index [" + index + "]"
                         );
@@ -53,18 +55,18 @@ public final class FetchSourcePhase implements FetchSubPhase {
                 Source source = hitContext.source();
 
                 // If this is a parent document and there are no source filters, then add the source as-is.
-                if (nestedHit == false && containsFilters(fetchSourceContext) == false) {
+                if (nestedHit == false && fetchSourceContext.hasFilter() == false) {
                     hitContext.hit().sourceRef(source.internalSourceRef());
                     fastPath++;
                     return;
                 }
 
                 // Otherwise, filter the source and add it to the hit.
-                Map<String, Object> value = source.filter(fetchSourceContext);
+                source = source.filter(sourceFilter);
                 if (nestedHit) {
-                    value = getNestedSource(value, hitContext);
+                    source = extractNested(source, hitContext.hit().getNestedIdentity());
                 }
-                hitContext.hit().sourceRef(Source.fromMap(value, source.sourceContentType()).internalSourceRef());
+                hitContext.hit().sourceRef(source.internalSourceRef());
             }
 
             @Override
@@ -74,18 +76,16 @@ public final class FetchSourcePhase implements FetchSubPhase {
         };
     }
 
-    private static boolean containsFilters(FetchSourceContext context) {
-        return context.includes().length != 0 || context.excludes().length != 0;
-    }
-
     @SuppressWarnings("unchecked")
-    private static Map<String, Object> getNestedSource(Map<String, Object> sourceAsMap, HitContext hitContext) {
-        for (SearchHit.NestedIdentity o = hitContext.hit().getNestedIdentity(); o != null; o = o.getChild()) {
-            sourceAsMap = (Map<String, Object>) sourceAsMap.get(o.getField().string());
-            if (sourceAsMap == null) {
-                return null;
+    private static Source extractNested(Source in, SearchHit.NestedIdentity nestedIdentity) {
+        Map<String, Object> sourceMap = in.source();
+        while (nestedIdentity != null) {
+            sourceMap = (Map<String, Object>) sourceMap.get(nestedIdentity.getField().string());
+            if (sourceMap == null) {
+                return Source.empty(in.sourceContentType());
             }
+            nestedIdentity = nestedIdentity.getChild();
         }
-        return sourceAsMap;
+        return Source.fromMap(sourceMap, in.sourceContentType());
     }
 }

+ 44 - 13
server/src/main/java/org/elasticsearch/search/lookup/Source.java

@@ -14,7 +14,6 @@ import org.elasticsearch.common.xcontent.XContentHelper;
 import org.elasticsearch.common.xcontent.support.XContentMapValues;
 import org.elasticsearch.core.Nullable;
 import org.elasticsearch.core.Tuple;
-import org.elasticsearch.search.fetch.subphase.FetchSourceContext;
 import org.elasticsearch.xcontent.XContentBuilder;
 import org.elasticsearch.xcontent.XContentType;
 
@@ -51,6 +50,11 @@ public interface Source {
      */
     BytesReference internalSourceRef();
 
+    /**
+     * Apply a filter to this source, returning a new Source
+     */
+    Source filter(SourceFilter sourceFilter);
+
     /**
      * For the provided path, return its value in the source.
      *
@@ -67,30 +71,34 @@ public interface Source {
     }
 
     /**
-     * Apply a filter to this source, returning a new map representation
+     * An empty Source, represented as an empty map
      */
-    default Map<String, Object> filter(FetchSourceContext context) {
-        return context.getFilter().apply(source());
+    static Source empty(XContentType xContentType) {
+        return Source.fromMap(Map.of(), xContentType == null ? XContentType.JSON : xContentType);
     }
 
     /**
-     * An empty Source, represented as an empty json map
+     * Build a Source from a bytes representation with an unknown XContentType
      */
-    Source EMPTY = Source.fromMap(Map.of(), XContentType.JSON);
+    static Source fromBytes(BytesReference bytes) {
+        return fromBytes(bytes, null);
+    }
 
     /**
-     * Build a Source from a bytes representation
+     * Build a Source from a bytes representation with a known XContentType
      */
-    static Source fromBytes(BytesReference bytes) {
-        if (bytes == null) {
-            return EMPTY;
+    @SuppressWarnings("deprecation")
+    static Source fromBytes(BytesReference bytes, XContentType type) {
+        if (bytes == null || bytes.length() == 0) {
+            return empty(type);
         }
+        assert type == null || type.xContent() == XContentHelper.xContentType(bytes).xContent()
+            : "unexpected type " + type.xContent() + " expecting " + XContentHelper.xContentType(bytes).xContent();
         return new Source() {
 
             Map<String, Object> asMap = null;
-            XContentType xContentType = null;
+            XContentType xContentType = type;
 
-            @SuppressWarnings("deprecation")
             private void parseBytes() {
                 Tuple<XContentType, Map<String, Object>> t = XContentHelper.convertToMap(bytes, true);
                 this.xContentType = t.v1();
@@ -100,7 +108,7 @@ public interface Source {
             @Override
             public XContentType sourceContentType() {
                 if (xContentType == null) {
-                    parseBytes();
+                    xContentType = XContentHelper.xContentType(bytes);
                 }
                 return xContentType;
             }
@@ -117,6 +125,16 @@ public interface Source {
             public BytesReference internalSourceRef() {
                 return bytes;
             }
+
+            @Override
+            public Source filter(SourceFilter sourceFilter) {
+                // If we've already parsed to a map, then filter using that; but if we can
+                // filter without reifying the bytes then that will perform better.
+                if (asMap != null) {
+                    return sourceFilter.filterMap(this);
+                }
+                return sourceFilter.filterBytes(this);
+            }
         };
     }
 
@@ -146,6 +164,11 @@ public interface Source {
                 return mapToBytes(sourceMap, xContentType);
             }
 
+            @Override
+            public Source filter(SourceFilter sourceFilter) {
+                return sourceFilter.filterMap(this);
+            }
+
             private static BytesReference mapToBytes(Map<String, Object> value, XContentType xContentType) {
                 BytesStreamOutput streamOutput = new BytesStreamOutput(1024);
                 try {
@@ -190,6 +213,14 @@ public interface Source {
                 }
                 return inner.internalSourceRef();
             }
+
+            @Override
+            public Source filter(SourceFilter sourceFilter) {
+                if (inner == null) {
+                    inner = sourceSupplier.get();
+                }
+                return inner.filter(sourceFilter);
+            }
         };
     }
 

+ 108 - 0
server/src/main/java/org/elasticsearch/search/lookup/SourceFilter.java

@@ -0,0 +1,108 @@
+/*
+ * Copyright Elasticsearch B.V. and/or licensed to Elasticsearch B.V. under one
+ * or more contributor license agreements. Licensed under the Elastic License
+ * 2.0 and the Server Side Public License, v 1; you may not use this file except
+ * in compliance with, at your election, the Elastic License 2.0 or the Server
+ * Side Public License, v 1.
+ */
+
+package org.elasticsearch.search.lookup;
+
+import org.elasticsearch.common.Strings;
+import org.elasticsearch.common.bytes.BytesReference;
+import org.elasticsearch.common.io.stream.BytesStreamOutput;
+import org.elasticsearch.common.util.CollectionUtils;
+import org.elasticsearch.common.xcontent.support.XContentMapValues;
+import org.elasticsearch.xcontent.XContent;
+import org.elasticsearch.xcontent.XContentBuilder;
+import org.elasticsearch.xcontent.XContentParser;
+import org.elasticsearch.xcontent.XContentParserConfiguration;
+
+import java.io.IOException;
+import java.io.UncheckedIOException;
+import java.util.Arrays;
+import java.util.Map;
+import java.util.Set;
+import java.util.function.Function;
+
+/**
+ * Implements source filtering based on a list of included and excluded fields.  To use,
+ * construct a SourceFilter and pass it to {@link Source#filter(SourceFilter)}
+ */
+public final class SourceFilter {
+
+    private Function<Map<String, Object>, Map<String, Object>> mapFilter = null;
+    private Function<Source, Source> bytesFilter = null;
+
+    private final boolean canFilterBytes;
+    private final boolean empty;
+    private final String[] includes;
+    private final String[] excludes;
+
+    /**
+     * Construct a new filter based on a list of includes and excludes
+     * @param includes  an array of fields to include (may be null)
+     * @param excludes  an array of fields to exclude (may be null)
+     */
+    public SourceFilter(String[] includes, String[] excludes) {
+        this.includes = includes == null ? Strings.EMPTY_ARRAY : includes;
+        this.excludes = excludes == null ? Strings.EMPTY_ARRAY : excludes;
+        // TODO: Remove this once we upgrade to Jackson 2.14. There is currently a bug
+        // in exclude filtering if one of the excludes contains a wildcard '*'.
+        // see https://github.com/FasterXML/jackson-core/pull/729
+        this.canFilterBytes = CollectionUtils.isEmpty(excludes) || Arrays.stream(excludes).noneMatch(field -> field.contains("*"));
+        this.empty = CollectionUtils.isEmpty(this.includes) && CollectionUtils.isEmpty(this.excludes);
+    }
+
+    /**
+     * Filter a Source using its map representation
+     */
+    public Source filterMap(Source in) {
+        if (this.empty) {
+            return in;
+        }
+        if (mapFilter == null) {
+            mapFilter = XContentMapValues.filter(includes, excludes);
+        }
+        return Source.fromMap(mapFilter.apply(in.source()), in.sourceContentType());
+    }
+
+    /**
+     * Filter a Source using its bytes representation
+     */
+    public Source filterBytes(Source in) {
+        if (this.empty) {
+            return in;
+        }
+        if (bytesFilter == null) {
+            bytesFilter = buildBytesFilter();
+        }
+        return bytesFilter.apply(in);
+    }
+
+    private Function<Source, Source> buildBytesFilter() {
+        if (canFilterBytes == false) {
+            return this::filterMap;
+        }
+        final XContentParserConfiguration parserConfig = XContentParserConfiguration.EMPTY.withFiltering(
+            Set.copyOf(Arrays.asList(includes)),
+            Set.copyOf(Arrays.asList(excludes)),
+            true
+        );
+        return in -> {
+            try {
+                BytesStreamOutput streamOutput = new BytesStreamOutput(1024);
+                XContent xContent = in.sourceContentType().xContent();
+                XContentBuilder builder = new XContentBuilder(xContent, streamOutput);
+                XContentParser parser = xContent.createParser(parserConfig, in.internalSourceRef().streamInput());
+                if ((parser.currentToken() == null) && (parser.nextToken() == null)) {
+                    return Source.empty(in.sourceContentType());
+                }
+                builder.copyCurrentStructure(parser);
+                return Source.fromBytes(BytesReference.bytes(builder));
+            } catch (IOException e) {
+                throw new UncheckedIOException(e);
+            }
+        };
+    }
+}

+ 5 - 0
server/src/main/java/org/elasticsearch/search/lookup/SourceLookup.java

@@ -70,6 +70,11 @@ public class SourceLookup implements Source, Map<String, Object> {
         return sourceProvider.sourceAsBytes();
     }
 
+    @Override
+    public Source filter(SourceFilter sourceFilter) {
+        return sourceFilter.filterMap(this);
+    }
+
     /**
      * Checks if the source has been deserialized as a {@link Map} of java objects.
      */

+ 19 - 36
server/src/test/java/org/elasticsearch/common/xcontent/support/XContentFieldFilterTests.java → server/src/test/java/org/elasticsearch/common/xcontent/support/XContentSourceFilterTests.java

@@ -11,20 +11,17 @@ package org.elasticsearch.common.xcontent.support;
 import org.elasticsearch.common.Strings;
 import org.elasticsearch.common.bytes.BytesArray;
 import org.elasticsearch.common.bytes.BytesReference;
-import org.elasticsearch.common.io.stream.BytesStreamOutput;
-import org.elasticsearch.common.xcontent.XContentFieldFilter;
 import org.elasticsearch.common.xcontent.XContentHelper;
 import org.elasticsearch.core.CheckedFunction;
+import org.elasticsearch.search.lookup.Source;
+import org.elasticsearch.search.lookup.SourceFilter;
 import org.elasticsearch.xcontent.ToXContentObject;
-import org.elasticsearch.xcontent.XContentBuilder;
-import org.elasticsearch.xcontent.XContentFactory;
 import org.elasticsearch.xcontent.XContentParser;
 import org.elasticsearch.xcontent.XContentParserConfiguration;
 import org.elasticsearch.xcontent.XContentType;
 
 import java.io.IOException;
 import java.util.Collection;
-import java.util.Collections;
 import java.util.List;
 import java.util.Map;
 import java.util.Set;
@@ -36,7 +33,7 @@ import static org.elasticsearch.common.xcontent.XContentHelper.toXContent;
 import static org.elasticsearch.test.MapMatcher.assertMap;
 import static org.elasticsearch.test.MapMatcher.matchesMap;
 
-public class XContentFieldFilterTests extends AbstractFilteringTestCase {
+public class XContentSourceFilterTests extends AbstractFilteringTestCase {
     @Override
     protected void testFilter(Builder expected, Builder actual, Collection<String> includes, Collection<String> excludes)
         throws IOException {
@@ -54,9 +51,12 @@ public class XContentFieldFilterTests extends AbstractFilteringTestCase {
         } else {
             sourceExcludes = excludes.toArray(new String[excludes.size()]);
         }
-        XContentFieldFilter filter = XContentFieldFilter.newFieldFilter(sourceIncludes, sourceExcludes);
-        BytesReference ref = filter.apply(toBytesReference(actual, xContentType, humanReadable), xContentType);
-        assertMap(XContentHelper.convertToMap(ref, true, xContentType).v2(), matchesMap(toMap(expected, xContentType, humanReadable)));
+        SourceFilter filter = new SourceFilter(sourceIncludes, sourceExcludes);
+        Source filtered = Source.fromBytes(toBytesReference(actual, xContentType, humanReadable), xContentType).filter(filter);
+        assertMap(
+            XContentHelper.convertToMap(filtered.internalSourceRef(), true, xContentType).v2(),
+            matchesMap(toMap(expected, xContentType, humanReadable))
+        );
     }
 
     private void testFilter(String expectedJson, String actualJson, Collection<String> includes, Collection<String> excludes)
@@ -538,33 +538,16 @@ public class XContentFieldFilterTests extends AbstractFilteringTestCase {
     }
 
     public void testEmptySource() throws IOException {
-        final CheckedFunction<XContentType, BytesReference, IOException> emptyValueSupplier = xContentType -> {
-            BytesStreamOutput bStream = new BytesStreamOutput();
-            XContentBuilder builder = XContentFactory.contentBuilder(xContentType, bStream).map(Collections.emptyMap());
-            builder.close();
-            return bStream.bytes();
-        };
-        final XContentType xContentType = randomFrom(XContentType.values());
-        // null value for parser filter
-        assertEquals(
-            emptyValueSupplier.apply(xContentType),
-            XContentFieldFilter.newFieldFilter(new String[0], new String[0]).apply(null, xContentType)
-        );
-        // empty bytes for parser filter
-        assertEquals(
-            emptyValueSupplier.apply(xContentType),
-            XContentFieldFilter.newFieldFilter(new String[0], new String[0]).apply(BytesArray.EMPTY, xContentType)
-        );
-        // null value for map filter
-        assertEquals(
-            emptyValueSupplier.apply(xContentType),
-            XContentFieldFilter.newFieldFilter(new String[0], new String[] { "test*" }).apply(null, xContentType)
-        );
-        // empty bytes for map filter
-        assertEquals(
-            emptyValueSupplier.apply(xContentType),
-            XContentFieldFilter.newFieldFilter(new String[0], new String[] { "test*" }).apply(BytesArray.EMPTY, xContentType)
-        );
+        SourceFilter empty = new SourceFilter(new String[0], new String[0]);
+        SourceFilter excludeWildcard = new SourceFilter(new String[0], new String[] { "test* " });
+        for (XContentType xContentType : XContentType.values()) {
+            assertEquals(Map.of(), Source.fromBytes(null, xContentType).filter(empty).source());
+            assertEquals(Map.of(), Source.fromBytes(null, xContentType).filter(excludeWildcard).source());
+            assertEquals(Map.of(), Source.fromBytes(BytesArray.EMPTY, xContentType).filter(empty).source());
+            assertEquals(Map.of(), Source.fromBytes(BytesArray.EMPTY, xContentType).filter(excludeWildcard).source());
+            assertEquals(Map.of(), Source.fromMap(null, xContentType).filter(empty).source());
+            assertEquals(Map.of(), Source.fromMap(Map.of(), xContentType).filter(excludeWildcard).source());
+        }
     }
 
     private BytesReference toBytesReference(Builder builder, XContentType xContentType, boolean humanReadable) throws IOException {

+ 1 - 1
server/src/test/java/org/elasticsearch/search/fetch/subphase/FetchFieldsPhaseTests.java

@@ -78,7 +78,7 @@ public class FetchFieldsPhaseTests extends ESTestCase {
             processor.setNextReader(context);
             for (int doc = 0; doc < context.reader().maxDoc(); doc++) {
                 SearchHit searchHit = new SearchHit(doc + context.docBase);
-                processor.process(new FetchSubPhase.HitContext(searchHit, context, doc, Source.EMPTY));
+                processor.process(new FetchSubPhase.HitContext(searchHit, context, doc, Source.empty(null)));
                 assertNotNull(searchHit.getFields().get("field"));
             }
         }

+ 1 - 1
server/src/test/java/org/elasticsearch/search/fetch/subphase/FetchSourcePhaseTests.java

@@ -175,7 +175,7 @@ public class FetchSourcePhaseTests extends ESTestCase {
         // We don't need a real index, just a LeafReaderContext which cannot be mocked.
         MemoryIndex index = new MemoryIndex();
         LeafReaderContext leafReaderContext = index.createSearcher().getIndexReader().leaves().get(0);
-        Source source = sourceBuilder == null ? Source.EMPTY : Source.fromBytes(BytesReference.bytes(sourceBuilder));
+        Source source = sourceBuilder == null ? Source.empty(null) : Source.fromBytes(BytesReference.bytes(sourceBuilder));
         HitContext hitContext = new HitContext(searchHit, leafReaderContext, 1, source);
 
         FetchSourcePhase phase = new FetchSourcePhase();

+ 114 - 0
server/src/test/java/org/elasticsearch/search/lookup/SourceFilterTests.java

@@ -0,0 +1,114 @@
+/*
+ * Copyright Elasticsearch B.V. and/or licensed to Elasticsearch B.V. under one
+ * or more contributor license agreements. Licensed under the Elastic License
+ * 2.0 and the Server Side Public License, v 1; you may not use this file except
+ * in compliance with, at your election, the Elastic License 2.0 or the Server
+ * Side Public License, v 1.
+ */
+
+package org.elasticsearch.search.lookup;
+
+import org.elasticsearch.common.bytes.BytesArray;
+import org.elasticsearch.common.bytes.BytesReference;
+import org.elasticsearch.test.ESTestCase;
+import org.elasticsearch.xcontent.XContentType;
+
+import java.util.List;
+import java.util.Map;
+
+public class SourceFilterTests extends ESTestCase {
+
+    public void testEmptyFiltering() {
+        Source s = Source.fromMap(Map.of("field", "value"), XContentType.JSON);
+        Source filtered = s.filter(new SourceFilter(new String[] {}, new String[] {}));
+        assertSame(s, filtered);
+    }
+
+    public void testSimpleInclude() {
+        Source s = Source.fromBytes(new BytesArray("""
+            { "field1" : "value1", "field2" : "value2" }"""));
+        Source filtered = s.filter(new SourceFilter(new String[] { "field2" }, new String[] {}));
+        assertTrue(filtered.source().containsKey("field2"));
+        assertEquals("value2", filtered.source().get("field2"));
+        assertFalse(filtered.source().containsKey("field1"));
+    }
+
+    public void testSimpleExclude() {
+        Source s = Source.fromBytes(new BytesArray("""
+            { "field1" : "value1", "field2" : "value2" }"""));
+        Source filtered = s.filter(new SourceFilter(new String[] {}, new String[] { "field1" }));
+        assertTrue(filtered.source().containsKey("field2"));
+        assertEquals("value2", filtered.source().get("field2"));
+        assertFalse(filtered.source().containsKey("field1"));
+    }
+
+    public void testCombinedIncludesAndExcludes() {
+        Source s = Source.fromBytes(new BytesArray("""
+            {
+              "requests": {
+                "count": 10,
+                "foo": "bar"
+              },
+              "meta": {
+                "name": "Some metric",
+                "description": "Some metric description",
+                "other": {
+                  "foo": "one",
+                  "baz": "two"
+                }
+              }
+            }
+            """));
+        SourceFilter sourceFilter = new SourceFilter(
+            new String[] { "*.count", "meta.*" },
+            new String[] { "meta.description", "meta.other.*" }
+        );
+
+        s = s.filter(sourceFilter);
+        Map<String, Object> expected = Map.of("requests", Map.of("count", 10), "meta", Map.of("name", "Some metric", "other", Map.of()));
+        assertEquals(expected, s.source());
+    }
+
+    public void testExcludeWithWildcards() {
+        Source s = Source.fromBytes(new BytesArray("""
+            { "field1" : "value1", "array_field" : [ "value2" ] }"""));
+        Source filtered = s.filter(new SourceFilter(new String[] {}, new String[] { "array*" }));
+        assertTrue(filtered.source().containsKey("field1"));
+        assertEquals("value1", filtered.source().get("field1"));
+        assertFalse(filtered.source().containsKey("array_field"));
+    }
+
+    public void testExcludeWithWildcardsUsesMap() {
+
+        Source s = new Source() {
+            @Override
+            public XContentType sourceContentType() {
+                return XContentType.JSON;
+            }
+
+            @Override
+            public Map<String, Object> source() {
+                return Map.of("field", "value", "array_field", List.of("value1", "value2"));
+            }
+
+            @Override
+            public BytesReference internalSourceRef() {
+                throw new AssertionError("SourceFilter with '*' in excludes list should filter on map");
+            }
+
+            @Override
+            public Source filter(SourceFilter sourceFilter) {
+                // We call filterBytes explicitly here but the filter should re-route to
+                // using filterMap because it contains an exclude filter with a wildcard
+                return sourceFilter.filterBytes(this);
+            }
+        };
+
+        Source filtered = s.filter(new SourceFilter(new String[] {}, new String[] { "array*" }));
+        assertTrue(filtered.source().containsKey("field"));
+        assertEquals("value", filtered.source().get("field"));
+        assertFalse(filtered.source().containsKey("array_field"));
+
+    }
+
+}