Ver Fonte

[ML] frequent items filter (#91137)

add a filter to the frequent items agg that filters documents from the analysis while still calculating support on the full set

A filter is specified top-level in frequent_items:

"frequent_items": {
  "filter": {
    "term": {
      "host.name.keyword": "i-12345"
    }
   },
...

The above filters documents that don't match, however still counts the docs when calculating support. That's in contrast to
specifying a query at the top, in which case you find the same item sets, but don't know the importance given the full
document set.
Hendrik Muhs há 3 anos atrás
pai
commit
14b2d2d37e
13 ficheiros alterados com 249 adições e 34 exclusões
  1. 5 0
      docs/changelog/91137.yaml
  2. 53 2
      docs/reference/aggregations/bucket/frequent-items-aggregation.asciidoc
  3. 9 0
      x-pack/plugin/ml/src/main/java/org/elasticsearch/xpack/ml/aggs/frequentitemsets/EclatMapReducer.java
  4. 32 7
      x-pack/plugin/ml/src/main/java/org/elasticsearch/xpack/ml/aggs/frequentitemsets/FrequentItemSetsAggregationBuilder.java
  5. 7 2
      x-pack/plugin/ml/src/main/java/org/elasticsearch/xpack/ml/aggs/frequentitemsets/FrequentItemSetsAggregatorFactory.java
  6. 17 1
      x-pack/plugin/ml/src/main/java/org/elasticsearch/xpack/ml/aggs/frequentitemsets/HashBasedTransactionStore.java
  7. 19 1
      x-pack/plugin/ml/src/main/java/org/elasticsearch/xpack/ml/aggs/frequentitemsets/ImmutableTransactionStore.java
  8. 7 0
      x-pack/plugin/ml/src/main/java/org/elasticsearch/xpack/ml/aggs/frequentitemsets/TransactionStore.java
  9. 9 0
      x-pack/plugin/ml/src/main/java/org/elasticsearch/xpack/ml/aggs/frequentitemsets/mr/AbstractItemSetMapReducer.java
  10. 36 11
      x-pack/plugin/ml/src/main/java/org/elasticsearch/xpack/ml/aggs/frequentitemsets/mr/ItemSetMapReduceAggregator.java
  11. 13 6
      x-pack/plugin/ml/src/test/java/org/elasticsearch/xpack/ml/aggs/frequentitemsets/FrequentItemSetsAggregationBuilderTests.java
  12. 8 4
      x-pack/plugin/ml/src/test/java/org/elasticsearch/xpack/ml/aggs/frequentitemsets/FrequentItemSetsAggregatorTests.java
  13. 34 0
      x-pack/plugin/src/yamlRestTest/resources/rest-api-spec/test/ml/frequent_items_agg.yml

+ 5 - 0
docs/changelog/91137.yaml

@@ -0,0 +1,5 @@
+pr: 91137
+summary: Add a filter parameter to frequent items
+area: Machine Learning
+type: enhancement
+issues: []

+ 53 - 2
docs/reference/aggregations/bucket/frequent-items-aggregation.asciidoc

@@ -51,6 +51,7 @@ A `frequent_items` aggregation looks like this in isolation:
 |`minimum_set_size` | (integer) The <<frequent-items-minimum-set-size,minimum size>> of one item set. | Optional | `1`
 |`minimum_support` | (integer) The <<frequent-items-minimum-support,minimum support>> of one item set. | Optional | `0.1`
 |`size` | (integer) The number of top item sets to return. | Optional | `10`
+|`filter` | (object) Query that filters documents from the analysis | Optional | `match_all`
 |===
 
 
@@ -102,6 +103,18 @@ parameter has a significant effect on the required memory and the runtime of the
 aggregation.
 
 
+[discrete]
+[[frequent-items-filter]]
+==== Filter
+
+A query to filter documents to use as part of the analysis. Documents that
+don't match the filter are ignored when generating the item sets, however still
+count when calculating the support of an item set.
+
+Use the filter if you want to narrow the item set analysis to fields of interest.
+Use a top-level query to filter the data set.
+
+
 [discrete]
 [[frequent-items-example]]
 ==== Examples
@@ -123,7 +136,7 @@ example.
 
 [source,console]
 -------------------------------------------------
-POST /kibana_sample_data_ecommerce /_async_search 
+POST /kibana_sample_data_ecommerce/_async_search
 {
   "size": 0,
   "aggs": {
@@ -224,7 +237,45 @@ from New York. Finally, the item set with the third highest support is
 
 
 [discrete]
-==== Analizing numeric values by using a runtime field
+==== Aggregation with two analyzed fields and a filter
+
+We take the first example, but want to narrow the item sets to places in Europe.
+For that we add a filter:
+
+[source,console]
+-------------------------------------------------
+POST /kibana_sample_data_ecommerce/_async_search
+{
+  "size": 0,
+  "aggs": {
+    "my_agg": {
+      "frequent_items": {
+        "minimum_set_size": 3,
+        "fields": [
+          { "field": "category.keyword" },
+          { "field": "geoip.city_name" }
+        ],
+        "size": 3,
+        "filter": {
+          "term": {
+            "geoip.continent_name": "Europe"
+          }
+        }
+      }
+    }
+  }
+}
+-------------------------------------------------
+// TEST[skip:setup kibana sample data]
+
+The result will only show item sets that created from documents matching the
+filter, namely purchases in Europe. Using `filter` the calculated `support` still
+takes all purchases into acount. That's different to specifying a query at the
+top-level, in which case `support` gets calculated only from purchases in Europe.
+
+
+[discrete]
+==== Analyzing numeric values by using a runtime field
 
 The frequent items aggregation enables you to bucket numeric values by using 
 <<runtime,runtime fields>>. The next example demonstrates how to use a script to 

+ 9 - 0
x-pack/plugin/ml/src/main/java/org/elasticsearch/xpack/ml/aggs/frequentitemsets/EclatMapReducer.java

@@ -188,6 +188,12 @@ public final class EclatMapReducer extends AbstractItemSetMapReducer<
         return transactionStore;
     }
 
+    @Override
+    public HashBasedTransactionStore mapFiltered(HashBasedTransactionStore transactionStore) {
+        transactionStore.addFilteredTransaction();
+        return transactionStore;
+    }
+
     @Override
     protected ImmutableTransactionStore mapFinalize(HashBasedTransactionStore transactionStore) {
 
@@ -197,6 +203,7 @@ public final class EclatMapReducer extends AbstractItemSetMapReducer<
             profilingInfoMap.put("ram_bytes_transactionstore_after_map", transactionStore.ramBytesUsed());
             profilingInfoMap.put("total_items_after_map", transactionStore.getTotalItemCount());
             profilingInfoMap.put("total_transactions_after_map", transactionStore.getTotalTransactionCount());
+            profilingInfoMap.put("filtered_transactions_after_map", transactionStore.getFilteredTransactionCount());
             profilingInfoMap.put("unique_items_after_map", transactionStore.getUniqueItemsCount());
             profilingInfoMap.put("unique_transactions_after_map", transactionStore.getUniqueTransactionCount());
         }
@@ -283,6 +290,7 @@ public final class EclatMapReducer extends AbstractItemSetMapReducer<
             profilingInfoReduce.put("ram_bytes_transactionstore_after_reduce", transactionStore.ramBytesUsed());
             profilingInfoReduce.put("total_items_after_reduce", transactionStore.getTotalItemCount());
             profilingInfoReduce.put("total_transactions_after_reduce", transactionStore.getTotalTransactionCount());
+            profilingInfoReduce.put("filtered_transactions_after_reduce", transactionStore.getFilteredTransactionCount());
             profilingInfoReduce.put("unique_items_after_reduce", transactionStore.getUniqueItemsCount());
             profilingInfoReduce.put("unique_transactions_after_reduce", transactionStore.getUniqueTransactionCount());
         }
@@ -293,6 +301,7 @@ public final class EclatMapReducer extends AbstractItemSetMapReducer<
             profilingInfoReduce.put("ram_bytes_transactionstore_after_prune", transactionStore.ramBytesUsed());
             profilingInfoReduce.put("total_items_after_prune", transactionStore.getTotalItemCount());
             profilingInfoReduce.put("total_transactions_after_prune", transactionStore.getTotalTransactionCount());
+            profilingInfoReduce.put("filtered_transactions_after_prune", transactionStore.getFilteredTransactionCount());
             profilingInfoReduce.put("unique_items_after_prune", transactionStore.getUniqueItemsCount());
             profilingInfoReduce.put("unique_transactions_after_prune", transactionStore.getUniqueTransactionCount());
         }

+ 32 - 7
x-pack/plugin/ml/src/main/java/org/elasticsearch/xpack/ml/aggs/frequentitemsets/FrequentItemSetsAggregationBuilder.java

@@ -10,6 +10,8 @@ package org.elasticsearch.xpack.ml.aggs.frequentitemsets;
 import org.elasticsearch.Version;
 import org.elasticsearch.common.io.stream.StreamInput;
 import org.elasticsearch.common.io.stream.StreamOutput;
+import org.elasticsearch.index.query.AbstractQueryBuilder;
+import org.elasticsearch.index.query.QueryBuilder;
 import org.elasticsearch.search.aggregations.AbstractAggregationBuilder;
 import org.elasticsearch.search.aggregations.Aggregation;
 import org.elasticsearch.search.aggregations.AggregationBuilder;
@@ -21,6 +23,7 @@ import org.elasticsearch.search.aggregations.support.MultiValuesSourceFieldConfi
 import org.elasticsearch.search.aggregations.support.ValuesSourceRegistry;
 import org.elasticsearch.xcontent.ConstructingObjectParser;
 import org.elasticsearch.xcontent.ContextParser;
+import org.elasticsearch.xcontent.ObjectParser;
 import org.elasticsearch.xcontent.ParseField;
 import org.elasticsearch.xcontent.XContentBuilder;
 import org.elasticsearch.xpack.ml.aggs.frequentitemsets.mr.ItemSetMapReduceValueSource;
@@ -50,22 +53,29 @@ public final class FrequentItemSetsAggregationBuilder extends AbstractAggregatio
             double minimumSupport = args[1] == null ? DEFAULT_MINIMUM_SUPPORT : (double) args[1];
             int minimumSetSize = args[2] == null ? DEFAULT_MINIMUM_SET_SIZE : (int) args[2];
             int size = args[3] == null ? DEFAULT_SIZE : (int) args[3];
+            QueryBuilder filter = (QueryBuilder) args[4];
 
-            return new FrequentItemSetsAggregationBuilder(context, fields, minimumSupport, minimumSetSize, size);
+            return new FrequentItemSetsAggregationBuilder(context, fields, minimumSupport, minimumSetSize, size, filter);
         }
     );
 
     static {
-        ContextParser<Void, MultiValuesSourceFieldConfig.Builder> metricParser = MultiValuesSourceFieldConfig.parserBuilder(
+        ContextParser<Void, MultiValuesSourceFieldConfig.Builder> fieldsParser = MultiValuesSourceFieldConfig.parserBuilder(
             false,  // scriptable
             false,  // timezone aware
-            false,  // filtered
+            false,  // filtered (not defined per field, but for all fields below)
             false   // format
         );
-        PARSER.declareObjectArray(ConstructingObjectParser.constructorArg(), (p, n) -> metricParser.parse(p, null).build(), FIELDS);
+        PARSER.declareObjectArray(ConstructingObjectParser.constructorArg(), (p, n) -> fieldsParser.parse(p, null).build(), FIELDS);
         PARSER.declareDouble(ConstructingObjectParser.optionalConstructorArg(), MINIMUM_SUPPORT);
         PARSER.declareInt(ConstructingObjectParser.optionalConstructorArg(), MINIMUM_SET_SIZE);
         PARSER.declareInt(ConstructingObjectParser.optionalConstructorArg(), Aggregation.CommonFields.SIZE);
+        PARSER.declareField(
+            ConstructingObjectParser.optionalConstructorArg(),
+            (p, context) -> AbstractQueryBuilder.parseTopLevelQuery(p),
+            MultiValuesSourceFieldConfig.FILTER,
+            ObjectParser.ValueType.OBJECT
+        );
     }
 
     static final ValuesSourceRegistry.RegistryKey<ItemSetMapReduceValueSource.ValueSourceSupplier> REGISTRY_KEY =
@@ -92,13 +102,15 @@ public final class FrequentItemSetsAggregationBuilder extends AbstractAggregatio
     private final double minimumSupport;
     private final int minimumSetSize;
     private final int size;
+    private final QueryBuilder filter;
 
     public FrequentItemSetsAggregationBuilder(
         String name,
         List<MultiValuesSourceFieldConfig> fields,
         double minimumSupport,
         int minimumSetSize,
-        int size
+        int size,
+        QueryBuilder filter
     ) {
         super(name);
         this.fields = fields;
@@ -118,6 +130,7 @@ public final class FrequentItemSetsAggregationBuilder extends AbstractAggregatio
             throw new IllegalArgumentException("[size] must be greater than 0. Found [" + size + "] in [" + name + "]");
         }
         this.size = size;
+        this.filter = filter;
     }
 
     public FrequentItemSetsAggregationBuilder(StreamInput in) throws IOException {
@@ -126,6 +139,11 @@ public final class FrequentItemSetsAggregationBuilder extends AbstractAggregatio
         this.minimumSupport = in.readDouble();
         this.minimumSetSize = in.readVInt();
         this.size = in.readVInt();
+        if (in.getVersion().onOrAfter(Version.V_8_6_0)) {
+            this.filter = in.readOptionalNamedWriteable(QueryBuilder.class);
+        } else {
+            this.filter = null;
+        }
     }
 
     @Override
@@ -135,7 +153,7 @@ public final class FrequentItemSetsAggregationBuilder extends AbstractAggregatio
 
     @Override
     protected AggregationBuilder shallowCopy(Builder factoriesBuilder, Map<String, Object> metadata) {
-        return new FrequentItemSetsAggregationBuilder(name, fields, minimumSupport, minimumSetSize, size);
+        return new FrequentItemSetsAggregationBuilder(name, fields, minimumSupport, minimumSetSize, size, filter);
     }
 
     @Override
@@ -149,6 +167,9 @@ public final class FrequentItemSetsAggregationBuilder extends AbstractAggregatio
         out.writeDouble(minimumSupport);
         out.writeVInt(minimumSetSize);
         out.writeVInt(size);
+        if (out.getVersion().onOrAfter(Version.V_8_6_0)) {
+            out.writeOptionalNamedWriteable(filter);
+        }
     }
 
     @Override
@@ -164,7 +185,8 @@ public final class FrequentItemSetsAggregationBuilder extends AbstractAggregatio
             fields,
             minimumSupport,
             minimumSetSize,
-            size
+            size,
+            filter
         );
     }
 
@@ -179,6 +201,9 @@ public final class FrequentItemSetsAggregationBuilder extends AbstractAggregatio
         builder.field(MINIMUM_SUPPORT.getPreferredName(), minimumSupport);
         builder.field(MINIMUM_SET_SIZE.getPreferredName(), minimumSetSize);
         builder.field(Aggregation.CommonFields.SIZE.getPreferredName(), size);
+        if (filter != null) {
+            builder.field(MultiValuesSourceFieldConfig.FILTER.getPreferredName(), filter);
+        }
         builder.endObject();
         return builder;
     }

+ 7 - 2
x-pack/plugin/ml/src/main/java/org/elasticsearch/xpack/ml/aggs/frequentitemsets/FrequentItemSetsAggregatorFactory.java

@@ -8,6 +8,7 @@
 package org.elasticsearch.xpack.ml.aggs.frequentitemsets;
 
 import org.elasticsearch.common.io.stream.Writeable;
+import org.elasticsearch.index.query.QueryBuilder;
 import org.elasticsearch.search.SearchService;
 import org.elasticsearch.search.aggregations.AggregationExecutionException;
 import org.elasticsearch.search.aggregations.Aggregator;
@@ -59,6 +60,7 @@ public class FrequentItemSetsAggregatorFactory extends AggregatorFactory {
     private final double minimumSupport;
     private final int minimumSetSize;
     private final int size;
+    private final QueryBuilder filter;
 
     public FrequentItemSetsAggregatorFactory(
         String name,
@@ -69,13 +71,15 @@ public class FrequentItemSetsAggregatorFactory extends AggregatorFactory {
         List<MultiValuesSourceFieldConfig> fields,
         double minimumSupport,
         int minimumSetSize,
-        int size
+        int size,
+        QueryBuilder filter
     ) throws IOException {
         super(name, context, parent, subFactoriesBuilder, metadata);
         this.fields = fields;
         this.minimumSupport = minimumSupport;
         this.minimumSetSize = minimumSetSize;
         this.size = size;
+        this.filter = filter;
     }
 
     @Override
@@ -109,7 +113,8 @@ public class FrequentItemSetsAggregatorFactory extends AggregatorFactory {
                 parent,
                 metadata,
                 new EclatMapReducer(FrequentItemSetsAggregationBuilder.NAME, minimumSupport, minimumSetSize, size, context.profiling()),
-                configs
+                configs,
+                filter
             ) {
         };
     }

+ 17 - 1
x-pack/plugin/ml/src/main/java/org/elasticsearch/xpack/ml/aggs/frequentitemsets/HashBasedTransactionStore.java

@@ -96,6 +96,7 @@ public final class HashBasedTransactionStore extends TransactionStore {
     private BytesRefHash transactions;
     private LongArray transactionCounts;
     private long totalTransactionCount;
+    private long filteredTransactionCount;
 
     public HashBasedTransactionStore(BigArrays bigArrays) {
         super(bigArrays);
@@ -209,6 +210,14 @@ public final class HashBasedTransactionStore extends TransactionStore {
         transactionCounts.increment(id, 1);
     }
 
+    /**
+     * Report a filtered transaction to the store.
+     */
+    public void addFilteredTransaction() {
+        ++filteredTransactionCount;
+        ++totalTransactionCount;
+    }
+
     @Override
     public long getTotalItemCount() {
         return totalItemCount;
@@ -219,6 +228,11 @@ public final class HashBasedTransactionStore extends TransactionStore {
         return totalTransactionCount;
     }
 
+    @Override
+    public long getFilteredTransactionCount() {
+        return filteredTransactionCount;
+    }
+
     @Override
     public BytesRefArray getItems() {
         return items.getBytesRefs();
@@ -292,6 +306,7 @@ public final class HashBasedTransactionStore extends TransactionStore {
 
         totalItemCount += other.getTotalItemCount();
         totalTransactionCount += other.getTotalTransactionCount();
+        filteredTransactionCount += other.getFilteredTransactionCount();
     }
 
     /**
@@ -445,7 +460,8 @@ public final class HashBasedTransactionStore extends TransactionStore {
             totalItemCount,
             transactions.takeBytesRefsOwnership(),
             transactionCounts,
-            totalTransactionCount
+            totalTransactionCount,
+            filteredTransactionCount
         );
 
         items = null;

+ 19 - 1
x-pack/plugin/ml/src/main/java/org/elasticsearch/xpack/ml/aggs/frequentitemsets/ImmutableTransactionStore.java

@@ -8,6 +8,7 @@
 package org.elasticsearch.xpack.ml.aggs.frequentitemsets;
 
 import org.apache.lucene.util.RamUsageEstimator;
+import org.elasticsearch.Version;
 import org.elasticsearch.common.io.stream.StreamInput;
 import org.elasticsearch.common.io.stream.StreamOutput;
 import org.elasticsearch.common.util.BigArrays;
@@ -29,6 +30,7 @@ public final class ImmutableTransactionStore extends TransactionStore {
     private final BytesRefArray transactions;
     private final LongArray transactionCounts;
     private final long totalTransactionCount;
+    private final long filteredTransactionCount;
 
     // base size of sealed transaction store, update if you add classes
     private static final long BASE_RAM_BYTES_USED = RamUsageEstimator.shallowSizeOfInstance(ImmutableTransactionStore.class) + 2
@@ -42,7 +44,8 @@ public final class ImmutableTransactionStore extends TransactionStore {
         long totalItemCount,
         BytesRefArray transactions,
         LongArray transactionCounts,
-        long totalTransactionCount
+        long totalTransactionCount,
+        long filteredTransactionCount
     ) {
         super(bigArrays);
 
@@ -52,6 +55,7 @@ public final class ImmutableTransactionStore extends TransactionStore {
         this.transactions = transactions;
         this.transactionCounts = transactionCounts;
         this.totalTransactionCount = totalTransactionCount;
+        this.filteredTransactionCount = filteredTransactionCount;
     }
 
     public ImmutableTransactionStore(StreamInput in, BigArrays bigArrays) throws IOException {
@@ -78,6 +82,12 @@ public final class ImmutableTransactionStore extends TransactionStore {
             }
             this.totalTransactionCount = in.readVLong();
 
+            if (in.getVersion().onOrAfter(Version.V_8_6_0)) {
+                this.filteredTransactionCount = in.readVLong();
+            } else {
+                this.filteredTransactionCount = 0;
+            }
+
             success = true;
         } finally {
             if (false == success) {
@@ -116,6 +126,11 @@ public final class ImmutableTransactionStore extends TransactionStore {
         return totalTransactionCount;
     }
 
+    @Override
+    public long getFilteredTransactionCount() {
+        return filteredTransactionCount;
+    }
+
     @Override
     public long ramBytesUsed() {
         return super.ramBytesUsed() + BASE_RAM_BYTES_USED + +items.ramBytesUsed() + itemCounts.ramBytesUsed() + transactions.ramBytesUsed()
@@ -143,6 +158,9 @@ public final class ImmutableTransactionStore extends TransactionStore {
             out.writeVLong(transactionCounts.get(i));
         }
         out.writeVLong(totalTransactionCount);
+        if (out.getVersion().onOrAfter(Version.V_8_6_0)) {
+            out.writeVLong(filteredTransactionCount);
+        }
     }
 
 }

+ 7 - 0
x-pack/plugin/ml/src/main/java/org/elasticsearch/xpack/ml/aggs/frequentitemsets/TransactionStore.java

@@ -208,6 +208,13 @@ abstract class TransactionStore implements Writeable, Releasable, Accountable {
      */
     abstract long getTotalTransactionCount();
 
+    /**
+     * Get the number of filtered transactions
+     *
+     * @return count of filtered transactions
+     */
+    abstract long getFilteredTransactionCount();
+
     abstract BytesRefArray getItems();
 
     abstract LongArray getItemCounts();

+ 9 - 0
x-pack/plugin/ml/src/main/java/org/elasticsearch/xpack/ml/aggs/frequentitemsets/mr/AbstractItemSetMapReducer.java

@@ -80,6 +80,15 @@ public abstract class AbstractItemSetMapReducer<
      */
     protected abstract MapContext map(Stream<Tuple<Field, List<Object>>> keyValues, MapContext mapContext);
 
+    /**
+     * Definition of the mapper for filtered values.
+     *
+     * @param mapContext context object for mapping
+     */
+    protected MapContext mapFiltered(MapContext mapContext) {
+        return mapContext;
+    }
+
     /**
      * Definition of code to execute(optional) after the mapper processed all input.
      *

+ 36 - 11
x-pack/plugin/ml/src/main/java/org/elasticsearch/xpack/ml/aggs/frequentitemsets/mr/ItemSetMapReduceAggregator.java

@@ -7,11 +7,17 @@
 
 package org.elasticsearch.xpack.ml.aggs.frequentitemsets.mr;
 
+import org.apache.lucene.search.IndexSearcher;
+import org.apache.lucene.search.ScoreMode;
+import org.apache.lucene.search.Weight;
+import org.apache.lucene.util.Bits;
 import org.apache.lucene.util.SetOnce;
 import org.elasticsearch.common.io.stream.Writeable;
+import org.elasticsearch.common.lucene.Lucene;
 import org.elasticsearch.common.util.BigArrays;
 import org.elasticsearch.common.util.LongObjectPagedHashMap;
 import org.elasticsearch.core.Releasables;
+import org.elasticsearch.index.query.QueryBuilder;
 import org.elasticsearch.search.aggregations.AggregationExecutionContext;
 import org.elasticsearch.search.aggregations.Aggregator;
 import org.elasticsearch.search.aggregations.AggregatorBase;
@@ -41,6 +47,7 @@ public abstract class ItemSetMapReduceAggregator<
     Result extends ToXContent & Writeable> extends AggregatorBase {
 
     private final List<ItemSetMapReduceValueSource> extractors;
+    private final Weight weightFilter;
     private final List<Field> fields;
     private final AbstractItemSetMapReducer<MapContext, MapFinalContext, ReduceContext, Result> mapReducer;
     private final BigArrays bigArraysForMapReduce;
@@ -55,13 +62,20 @@ public abstract class ItemSetMapReduceAggregator<
         Aggregator parent,
         Map<String, Object> metadata,
         AbstractItemSetMapReducer<MapContext, MapFinalContext, ReduceContext, Result> mapReducer,
-        List<ValuesSourceConfig> configs
+        List<ValuesSourceConfig> configs,
+        QueryBuilder filter
     ) throws IOException {
         super(name, AggregatorFactories.EMPTY, context, parent, CardinalityUpperBound.NONE, metadata);
 
         List<ItemSetMapReduceValueSource> extractors = new ArrayList<>();
         List<Field> fields = new ArrayList<>();
+        IndexSearcher contextSearcher = context.searcher();
+
         int id = 0;
+        this.weightFilter = filter != null
+            ? contextSearcher.createWeight(contextSearcher.rewrite(context.buildQuery(filter)), ScoreMode.COMPLETE_NO_SCORES, 1f)
+            : null;
+
         for (ValuesSourceConfig c : configs) {
             ItemSetMapReduceValueSource e = context.getValuesSourceRegistry().getAggregator(registryKey, c).build(c, id++);
             if (e.getField().getName() != null) {
@@ -100,20 +114,31 @@ public abstract class ItemSetMapReduceAggregator<
 
     @Override
     protected LeafBucketCollector getLeafCollector(AggregationExecutionContext ctx, LeafBucketCollector sub) throws IOException {
+
+        final Bits bits = weightFilter != null
+            ? Lucene.asSequentialAccessBits(
+                ctx.getLeafReaderContext().reader().maxDoc(),
+                weightFilter.scorerSupplier(ctx.getLeafReaderContext())
+            )
+            : null;
+
         return new LeafBucketCollectorBase(sub, null) {
             @Override
             public void collect(int doc, long owningBucketOrd) throws IOException {
                 SetOnce<IOException> firstException = new SetOnce<>();
-
-                mapReducer.map(extractors.stream().map(extractor -> {
-                    try {
-                        return extractor.collect(ctx.getLeafReaderContext(), doc);
-                    } catch (IOException e) {
-                        firstException.trySet(e);
-                        // ignored in AbstractMapReducer
-                        return null;
-                    }
-                }), getMapReduceContext(owningBucketOrd));
+                if (bits == null || bits.get(doc)) {
+                    mapReducer.map(extractors.stream().map(extractor -> {
+                        try {
+                            return extractor.collect(ctx.getLeafReaderContext(), doc);
+                        } catch (IOException e) {
+                            firstException.trySet(e);
+                            // ignored in AbstractMapReducer
+                            return null;
+                        }
+                    }), getMapReduceContext(owningBucketOrd));
+                } else {
+                    mapReducer.mapFiltered(getMapReduceContext(owningBucketOrd));
+                }
 
                 if (firstException.get() != null) {
                     throw firstException.get();

+ 13 - 6
x-pack/plugin/ml/src/test/java/org/elasticsearch/xpack/ml/aggs/frequentitemsets/FrequentItemSetsAggregationBuilderTests.java

@@ -10,6 +10,7 @@ package org.elasticsearch.xpack.ml.aggs.frequentitemsets;
 import org.elasticsearch.common.io.stream.NamedWriteableRegistry;
 import org.elasticsearch.common.io.stream.Writeable.Reader;
 import org.elasticsearch.common.settings.Settings;
+import org.elasticsearch.index.query.QueryBuilders;
 import org.elasticsearch.search.SearchModule;
 import org.elasticsearch.search.aggregations.AggregationBuilders;
 import org.elasticsearch.search.aggregations.AggregatorFactories;
@@ -54,7 +55,8 @@ public class FrequentItemSetsAggregationBuilderTests extends AbstractXContentSer
             fields,
             randomDoubleBetween(0.0, 1.0, false),
             randomIntBetween(1, 20),
-            randomIntBetween(1, 20)
+            randomIntBetween(1, 20),
+            randomBoolean() ? QueryBuilders.termQuery(randomAlphaOfLength(10), randomAlphaOfLength(10)) : null
         );
     }
 
@@ -111,7 +113,8 @@ public class FrequentItemSetsAggregationBuilderTests extends AbstractXContentSer
                 ),
                 1.2,
                 randomIntBetween(1, 20),
-                randomIntBetween(1, 20)
+                randomIntBetween(1, 20),
+                null
             )
         );
         assertEquals("[minimum_support] must be greater than 0 and less or equal to 1. Found [1.2] in [fi]", e.getMessage());
@@ -126,7 +129,8 @@ public class FrequentItemSetsAggregationBuilderTests extends AbstractXContentSer
                 ),
                 randomDoubleBetween(0.0, 1.0, false),
                 -4,
-                randomIntBetween(1, 20)
+                randomIntBetween(1, 20),
+                null
             )
         );
 
@@ -142,7 +146,8 @@ public class FrequentItemSetsAggregationBuilderTests extends AbstractXContentSer
                 ),
                 randomDoubleBetween(0.0, 1.0, false),
                 randomIntBetween(1, 20),
-                -2
+                -2,
+                null
             )
         );
 
@@ -158,7 +163,8 @@ public class FrequentItemSetsAggregationBuilderTests extends AbstractXContentSer
             ),
             randomDoubleBetween(0.0, 1.0, false),
             randomIntBetween(1, 20),
-            randomIntBetween(1, 20)
+            randomIntBetween(1, 20),
+            null
         ).subAggregation(AggregationBuilders.avg("fieldA")));
 
         assertEquals("Aggregator [fi] of type [frequent_items] cannot accept sub-aggregations", e.getMessage());
@@ -173,7 +179,8 @@ public class FrequentItemSetsAggregationBuilderTests extends AbstractXContentSer
             ),
             randomDoubleBetween(0.0, 1.0, false),
             randomIntBetween(1, 20),
-            randomIntBetween(1, 20)
+            randomIntBetween(1, 20),
+            null
         ).subAggregations(new AggregatorFactories.Builder().addAggregator(AggregationBuilders.avg("fieldA"))));
 
         assertEquals("Aggregator [fi] of type [frequent_items] cannot accept sub-aggregations", e.getMessage());

+ 8 - 4
x-pack/plugin/ml/src/test/java/org/elasticsearch/xpack/ml/aggs/frequentitemsets/FrequentItemSetsAggregatorTests.java

@@ -79,7 +79,8 @@ public class FrequentItemSetsAggregatorTests extends AggregatorTestCase {
             List.of(new MultiValuesSourceFieldConfig.Builder().setFieldName(fieldName).build()),
             FrequentItemSetsAggregationBuilder.DEFAULT_MINIMUM_SUPPORT,
             FrequentItemSetsAggregationBuilder.DEFAULT_MINIMUM_SET_SIZE,
-            FrequentItemSetsAggregationBuilder.DEFAULT_SIZE
+            FrequentItemSetsAggregationBuilder.DEFAULT_SIZE,
+            null
         );
     }
 
@@ -118,7 +119,8 @@ public class FrequentItemSetsAggregatorTests extends AggregatorTestCase {
             fields,
             minimumSupport,
             minimumSetSize,
-            size
+            size,
+            null
         );
 
         testCase(iw -> {
@@ -273,7 +275,8 @@ public class FrequentItemSetsAggregatorTests extends AggregatorTestCase {
             fields,
             minimumSupport,
             minimumSetSize,
-            size
+            size,
+            null
         );
 
         testCase(iw -> {
@@ -471,7 +474,8 @@ public class FrequentItemSetsAggregatorTests extends AggregatorTestCase {
             fields,
             minimumSupport,
             minimumSetSize,
-            size
+            size,
+            null
         );
 
         testCase(iw -> {

+ 34 - 0
x-pack/plugin/src/yamlRestTest/resources/rest-api-spec/test/ml/frequent_items_agg.yml

@@ -334,6 +334,40 @@ setup:
   - length: { aggregations.weekly.buckets.0.fi.buckets: 4 }
   - match: { aggregations.weekly.buckets.2.fi.buckets.0.doc_count: 1 }
 
+---
+"Test frequent items filter":
+
+  - do:
+      search:
+        index: store
+        body: >
+          {
+            "size": 0,
+            "aggs": {
+              "fi": {
+                "frequent_items": {
+                  "minimum_set_size": 3,
+                  "minimum_support": 0.3,
+                  "fields": [
+                    {"field": "features"},
+                    {"field": "error_message"}
+                  ],
+                  "filter": {
+                    "bool": {
+                      "must_not": {
+                        "term": {"features": "pink wheels"}
+                      }
+                    }
+                  }
+                }
+              }
+            }
+          }
+  - length: { aggregations.fi.buckets: 1 }
+  - match: { aggregations.fi.buckets.0.doc_count: 4 }
+  - match: { aggregations.fi.buckets.0.support: 0.4 }
+  - match: { aggregations.fi.buckets.0.key.error_message: ["compressor low pressure"] }
+
 ---
 "Test frequent items unsupported types":
   - do: