فهرست منبع

New feature - Sampler aggregation used to limit any nested aggregations' processing to a sample of the top-scoring documents.
Optionally, a “diversify” setting can limit the number of collected matches that share a common value such as an "author".

Closes #8108

markharwood 10 سال پیش
والد
کامیت
63db34f649
18فایلهای تغییر یافته به همراه2035 افزوده شده و 175 حذف شده
  1. 154 0
      docs/reference/search/aggregations/bucket/sampler-aggregation.asciidoc
  2. 3 0
      src/main/java/org/elasticsearch/search/aggregations/AggregationModule.java
  3. 8 1
      src/main/java/org/elasticsearch/search/aggregations/AggregatorBase.java
  4. 4 0
      src/main/java/org/elasticsearch/search/aggregations/TransportAggregationModule.java
  5. 191 0
      src/main/java/org/elasticsearch/search/aggregations/bucket/BestBucketsDeferringCollector.java
  6. 239 0
      src/main/java/org/elasticsearch/search/aggregations/bucket/BestDocsDeferringCollector.java
  7. 68 174
      src/main/java/org/elasticsearch/search/aggregations/bucket/DeferringBucketCollector.java
  8. 121 0
      src/main/java/org/elasticsearch/search/aggregations/bucket/sampler/DiversifiedBytesHashSamplerAggregator.java
  9. 133 0
      src/main/java/org/elasticsearch/search/aggregations/bucket/sampler/DiversifiedMapSamplerAggregator.java
  10. 111 0
      src/main/java/org/elasticsearch/search/aggregations/bucket/sampler/DiversifiedNumericSamplerAggregator.java
  11. 119 0
      src/main/java/org/elasticsearch/search/aggregations/bucket/sampler/DiversifiedOrdinalsSamplerAggregator.java
  12. 65 0
      src/main/java/org/elasticsearch/search/aggregations/bucket/sampler/InternalSampler.java
  13. 29 0
      src/main/java/org/elasticsearch/search/aggregations/bucket/sampler/Sampler.java
  14. 80 0
      src/main/java/org/elasticsearch/search/aggregations/bucket/sampler/SamplerAggregationBuilder.java
  15. 264 0
      src/main/java/org/elasticsearch/search/aggregations/bucket/sampler/SamplerAggregator.java
  16. 104 0
      src/main/java/org/elasticsearch/search/aggregations/bucket/sampler/SamplerParser.java
  17. 80 0
      src/main/java/org/elasticsearch/search/aggregations/bucket/sampler/UnmappedSampler.java
  18. 262 0
      src/test/java/org/elasticsearch/search/aggregations/bucket/SamplerTests.java

+ 154 - 0
docs/reference/search/aggregations/bucket/sampler-aggregation.asciidoc

@@ -0,0 +1,154 @@
+[[search-aggregations-bucket-sampler-aggregation]]
+=== Sampler Aggregation
+
+experimental[]
+
+A filtering aggregation used to limit any sub aggregations' processing to a sample of the top-scoring documents.
+Optionally, diversity settings can be used to limit the number of matches that share a common value such as an "author".
+
+.Example use cases:
+* Tightening the focus of analytics to high-relevance matches rather than the potentially very long tail of low-quality matches
+* Removing bias from analytics by ensuring fair representation of content from different sources
+* Reducing the running cost of aggregations that can produce useful results using only samples e.g. `significant_terms`
+ 
+
+Example:
+
+[source,js]
+--------------------------------------------------
+{
+    "query": {
+        "match": {
+            "text": "iphone"
+        }
+    },
+    "aggs": {
+        "sample": {
+            "sampler": {
+                "shard_size": 200,
+                "field" : "user.id"   
+            },
+            "aggs": {
+                "keywords": {
+                    "significant_terms": {
+                        "field": "text"
+                    }
+                }
+            }
+        }
+    }
+}
+--------------------------------------------------
+
+Response:
+
+[source,js]
+--------------------------------------------------
+{
+    ...
+        "aggregations": {
+        "sample": {
+            "doc_count": 1000,<1>
+            "keywords": {<2>
+                "doc_count": 1000,
+                "buckets": [
+                    ...
+                    {
+                        "key": "bend",
+                        "doc_count": 58,
+                        "score": 37.982536582524276,
+                        "bg_count": 103
+                    },
+                    ....
+}
+--------------------------------------------------
+
+<1> 1000 documents were sampled in total becase we asked for a maximum of 200 from an index with 5 shards. The cost of performing the nested significant_terms aggregation was therefore limited rather than unbounded.
+<2> The results of the significant_terms aggregation are not skewed by any single over-active Twitter user because we asked for a maximum of one tweet from any one user in our sample.
+
+
+==== shard_size
+
+The `shard_size` parameter limits how many top-scoring documents are collected in the sample processed on each shard.
+The default value is 100.
+
+=== Controlling diversity
+Optionally, you can use the `field` or `script` and `max_docs_per_value` settings to control the maximum number of documents collected on any one shard which share a common value.
+The choice of value (e.g. `author`) is loaded from a regular `field` or derived dynamically by a `script`.
+
+The aggregation will throw an error if the choice of field or script produces multiple values for a document.
+It is currently not possible to offer this form of de-duplication using many values, primarily due to concerns over efficiency.
+
+NOTE: Any good market researcher will tell you that when working with samples of data it is important
+that the sample represents a healthy variety of opinions rather than being skewed by any single voice.
+The same is true with aggregations and sampling with these diversify settings can offer a way to remove the bias in your content (an over-populated geography, a large spike in a timeline or an over-active forum spammer).  
+
+==== Field
+
+Controlling diversity using a field:
+
+[source,js]
+--------------------------------------------------
+{
+    "aggs" : {
+        "sample" : {
+            "sampler" : {
+                "field" : "author",
+                "max_docs_per_value" : 3
+            }
+        }
+    }
+}
+--------------------------------------------------
+
+Note that the `max_docs_per_value` setting applies on a per-shard basis only for the purposes of shard-local sampling.
+It is not intended as a way of providing a global de-duplication feature on search results.
+
+
+
+==== Script
+
+Controlling diversity using a script:
+
+[source,js]
+--------------------------------------------------
+{
+    "aggs" : {
+        "sample" : {
+            "sampler" : {
+                "script" : "doc['author'].value + '/' + doc['genre'].value"
+            }
+        }
+    }
+}
+--------------------------------------------------
+Note in the above example we chose to use the default `max_docs_per_value` setting of 1 and combine author and genre fields to ensure 
+each shard sample has, at most, one match for an author/genre pair.
+
+
+==== execution_hint
+
+When using the settings to control diversity, the optional `execution_hint` setting can influence the management of the values used for de-duplication.
+Each option will hold up to `shard_size` values in memory while performing de-duplication but the type of value held can be controlled as follows:
+ 
+ - hold field values directly (`map`)
+ - hold ordinals of the field as determined by the Lucene index (`global_ordinals`)
+ - hold hashes of the field values - with potential for hash collisions (`bytes_hash`)
+ 
+The default setting is to use `global_ordinals` if this information is available from the Lucene index and reverting to `map` if not.
+The `bytes_hash` setting may prove faster in some cases but introduces the possibility of false positives in de-duplication logic due to the possibility of hash collisions.
+Please note that Elasticsearch will ignore the choice of execution hint if it is not applicable and that there is no backward compatibility guarantee on these hints.
+
+=== Limitations
+
+==== Cannot be nested under `breadth_first` aggregations
+Being a quality-based filter the sampler aggregation needs access to the relevance score produced for each document.
+It therefore cannot be nested under a `terms` aggregation which has the `collect_mode` switched from the default `depth_first` mode to `breadth_first` as this discards scores.
+In this situation an error will be thrown.
+
+==== Limited de-dup logic.
+The de-duplication logic in the diversify settings applies only at a shard level so will not apply across shards.
+
+==== No specialized syntax for geo/date fields
+Currently the syntax for defining the diversifying values is defined by a choice of `field` or `script` - there is no added syntactical sugar for expressing geo or date units such as "1w" (1 week).
+This support may be added in a later release and users will currently have to create these sorts of values using a script.

+ 3 - 0
src/main/java/org/elasticsearch/search/aggregations/AggregationModule.java

@@ -20,6 +20,7 @@ package org.elasticsearch.search.aggregations;
 
 import com.google.common.collect.ImmutableList;
 import com.google.common.collect.Lists;
+
 import org.elasticsearch.common.inject.AbstractModule;
 import org.elasticsearch.common.inject.Module;
 import org.elasticsearch.common.inject.SpawnModules;
@@ -38,6 +39,7 @@ import org.elasticsearch.search.aggregations.bucket.range.RangeParser;
 import org.elasticsearch.search.aggregations.bucket.range.date.DateRangeParser;
 import org.elasticsearch.search.aggregations.bucket.range.geodistance.GeoDistanceParser;
 import org.elasticsearch.search.aggregations.bucket.range.ipv4.IpRangeParser;
+import org.elasticsearch.search.aggregations.bucket.sampler.SamplerParser;
 import org.elasticsearch.search.aggregations.bucket.significant.SignificantTermsParser;
 import org.elasticsearch.search.aggregations.bucket.significant.heuristics.SignificantTermsHeuristicModule;
 import org.elasticsearch.search.aggregations.bucket.terms.TermsParser;
@@ -80,6 +82,7 @@ public class AggregationModule extends AbstractModule implements SpawnModules{
         parsers.add(MissingParser.class);
         parsers.add(FilterParser.class);
         parsers.add(FiltersParser.class);
+        parsers.add(SamplerParser.class);
         parsers.add(TermsParser.class);
         parsers.add(SignificantTermsParser.class);
         parsers.add(RangeParser.class);

+ 8 - 1
src/main/java/org/elasticsearch/search/aggregations/AggregatorBase.java

@@ -19,6 +19,7 @@
 package org.elasticsearch.search.aggregations;
 
 import org.apache.lucene.index.LeafReaderContext;
+import org.elasticsearch.search.aggregations.bucket.BestBucketsDeferringCollector;
 import org.elasticsearch.search.aggregations.bucket.DeferringBucketCollector;
 import org.elasticsearch.search.aggregations.support.AggregationContext;
 import org.elasticsearch.search.internal.SearchContext.Lifetime;
@@ -136,7 +137,7 @@ public abstract class AggregatorBase extends Aggregator {
         for (int i = 0; i < subAggregators.length; ++i) {
             if (shouldDefer(subAggregators[i])) {
                 if (recordingWrapper == null) {
-                    recordingWrapper = new DeferringBucketCollector();
+                    recordingWrapper = getDeferringCollector();
                 }
                 deferredCollectors.add(subAggregators[i]);
                 subAggregators[i] = recordingWrapper.wrap(subAggregators[i]);
@@ -153,6 +154,12 @@ public abstract class AggregatorBase extends Aggregator {
         collectableSubAggregators.preCollection();
     }
 
+    public DeferringBucketCollector getDeferringCollector() {
+        // Default impl is a collector that selects the best buckets
+        // but an alternative defer policy may be based on best docs.
+        return new BestBucketsDeferringCollector();
+    }
+
     /**
      * This method should be overidden by subclasses that want to defer calculation
      * of a child aggregation until a first pass is complete and a set of buckets has

+ 4 - 0
src/main/java/org/elasticsearch/search/aggregations/TransportAggregationModule.java

@@ -36,6 +36,8 @@ import org.elasticsearch.search.aggregations.bucket.range.InternalRange;
 import org.elasticsearch.search.aggregations.bucket.range.date.InternalDateRange;
 import org.elasticsearch.search.aggregations.bucket.range.geodistance.InternalGeoDistance;
 import org.elasticsearch.search.aggregations.bucket.range.ipv4.InternalIPv4Range;
+import org.elasticsearch.search.aggregations.bucket.sampler.InternalSampler;
+import org.elasticsearch.search.aggregations.bucket.sampler.UnmappedSampler;
 import org.elasticsearch.search.aggregations.bucket.significant.SignificantLongTerms;
 import org.elasticsearch.search.aggregations.bucket.significant.SignificantStringTerms;
 import org.elasticsearch.search.aggregations.bucket.significant.UnmappedSignificantTerms;
@@ -83,6 +85,8 @@ public class TransportAggregationModule extends AbstractModule implements SpawnM
         InternalGlobal.registerStreams();
         InternalFilter.registerStreams();
         InternalFilters.registerStream();
+        InternalSampler.registerStreams();
+        UnmappedSampler.registerStreams();
         InternalMissing.registerStreams();
         StringTerms.registerStreams();
         LongTerms.registerStreams();

+ 191 - 0
src/main/java/org/elasticsearch/search/aggregations/bucket/BestBucketsDeferringCollector.java

@@ -0,0 +1,191 @@
+/*
+ * Licensed to Elasticsearch under one or more contributor
+ * license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright
+ * ownership. Elasticsearch licenses this file to you under
+ * the Apache License, Version 2.0 (the "License"); you may
+ * not use this file except in compliance with the License.
+ * You may obtain a copy of the License at
+ *
+ *    http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied.  See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+
+package org.elasticsearch.search.aggregations.bucket;
+
+import org.apache.lucene.index.LeafReaderContext;
+import org.apache.lucene.util.packed.PackedInts;
+import org.apache.lucene.util.packed.PackedLongValues;
+import org.elasticsearch.ElasticsearchIllegalStateException;
+import org.elasticsearch.common.lucene.Lucene;
+import org.elasticsearch.common.util.BigArrays;
+import org.elasticsearch.common.util.LongHash;
+import org.elasticsearch.search.aggregations.Aggregator;
+import org.elasticsearch.search.aggregations.Aggregator.SubAggCollectionMode;
+import org.elasticsearch.search.aggregations.BucketCollector;
+import org.elasticsearch.search.aggregations.InternalAggregation;
+import org.elasticsearch.search.aggregations.LeafBucketCollector;
+
+import java.io.IOException;
+import java.util.ArrayList;
+import java.util.List;
+
+/**
+ * A specialization of {@link DeferringBucketCollector} that collects all
+ * matches and then is able to replay a given subset of buckets which represent
+ * the survivors from a pruning process performed by the aggregator that owns
+ * this collector.
+ */
+public class BestBucketsDeferringCollector extends DeferringBucketCollector {
+    private static class Entry {
+        final LeafReaderContext context;
+        final PackedLongValues docDeltas;
+        final PackedLongValues buckets;
+
+        public Entry(LeafReaderContext context, PackedLongValues docDeltas, PackedLongValues buckets) {
+            this.context = context;
+            this.docDeltas = docDeltas;
+            this.buckets = buckets;
+        }
+    }
+
+    final List<Entry> entries = new ArrayList<>();
+    BucketCollector collector;
+    LeafReaderContext context;
+    PackedLongValues.Builder docDeltas;
+    PackedLongValues.Builder buckets;
+    long maxBucket = -1;
+    boolean finished = false;
+    LongHash selectedBuckets;
+
+    /** Sole constructor. */
+    public BestBucketsDeferringCollector() {
+    }
+
+    @Override
+    public boolean needsScores() {
+        if (collector == null) {
+            throw new ElasticsearchIllegalStateException();
+        }
+        return collector.needsScores();
+    }
+
+    /** Set the deferred collectors. */
+    public void setDeferredCollector(Iterable<BucketCollector> deferredCollectors) {
+        this.collector = BucketCollector.wrap(deferredCollectors);
+    }
+
+    private void finishLeaf() {
+        if (context != null) {
+            entries.add(new Entry(context, docDeltas.build(), buckets.build()));
+        }
+        context = null;
+        docDeltas = null;
+        buckets = null;
+    }
+
+    @Override
+    public LeafBucketCollector getLeafCollector(LeafReaderContext ctx) throws IOException {
+        finishLeaf();
+
+        context = ctx;
+        docDeltas = PackedLongValues.packedBuilder(PackedInts.DEFAULT);
+        buckets = PackedLongValues.packedBuilder(PackedInts.DEFAULT);
+
+        return new LeafBucketCollector() {
+            int lastDoc = 0;
+
+            @Override
+            public void collect(int doc, long bucket) throws IOException {
+                docDeltas.add(doc - lastDoc);
+                buckets.add(bucket);
+                lastDoc = doc;
+                maxBucket = Math.max(maxBucket, bucket);
+            }
+        };
+    }
+
+    @Override
+    public void preCollection() throws IOException {
+    }
+
+    @Override
+    public void postCollection() throws IOException {
+        finishLeaf();
+        finished = true;
+    }
+
+    /**
+     * Replay the wrapped collector, but only on a selection of buckets.
+     */
+    @Override
+    public void prepareSelectedBuckets(long... selectedBuckets) throws IOException {
+        if (!finished) {
+            throw new ElasticsearchIllegalStateException("Cannot replay yet, collection is not finished: postCollect() has not been called");
+        }
+        if (this.selectedBuckets != null) {
+            throw new ElasticsearchIllegalStateException("Already been replayed");
+        }
+
+        final LongHash hash = new LongHash(selectedBuckets.length, BigArrays.NON_RECYCLING_INSTANCE);
+        for (long bucket : selectedBuckets) {
+            hash.add(bucket);
+        }
+        this.selectedBuckets = hash;
+
+        collector.preCollection();
+        if (collector.needsScores()) {
+            throw new ElasticsearchIllegalStateException("Cannot defer if scores are needed");
+        }
+
+        for (Entry entry : entries) {
+            final LeafBucketCollector leafCollector = collector.getLeafCollector(entry.context);
+            leafCollector.setScorer(Lucene.illegalScorer("A limitation of the " + SubAggCollectionMode.BREADTH_FIRST
+                    + " collection mode is that scores cannot be buffered along with document IDs"));
+            final PackedLongValues.Iterator docDeltaIterator = entry.docDeltas.iterator();
+            final PackedLongValues.Iterator buckets = entry.buckets.iterator();
+            int doc = 0;
+            for (long i = 0, end = entry.docDeltas.size(); i < end; ++i) {
+                doc += docDeltaIterator.next();
+                final long bucket = buckets.next();
+                final long rebasedBucket = hash.find(bucket);
+                if (rebasedBucket != -1) {
+                    leafCollector.collect(doc, rebasedBucket);
+                }
+            }
+        }
+
+        collector.postCollection();
+    }
+
+    /**
+     * Wrap the provided aggregator so that it behaves (almost) as if it had
+     * been collected directly.
+     */
+    @Override
+    public Aggregator wrap(final Aggregator in) {
+
+        return new WrappedAggregator(in) {
+
+            @Override
+            public InternalAggregation buildAggregation(long bucket) throws IOException {
+                if (selectedBuckets == null) {
+                    throw new ElasticsearchIllegalStateException("Collection has not been replayed yet.");
+                }
+                final long rebasedBucket = selectedBuckets.find(bucket);
+                if (rebasedBucket == -1) {
+                    throw new ElasticsearchIllegalStateException("Cannot build for a bucket which has not been collected");
+                }
+                return in.buildAggregation(rebasedBucket);
+            }
+
+        };
+    }
+
+}

+ 239 - 0
src/main/java/org/elasticsearch/search/aggregations/bucket/BestDocsDeferringCollector.java

@@ -0,0 +1,239 @@
+/*
+ * Licensed to Elasticsearch under one or more contributor
+ * license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright
+ * ownership. Elasticsearch licenses this file to you under
+ * the Apache License, Version 2.0 (the "License"); you may
+ * not use this file except in compliance with the License.
+ * You may obtain a copy of the License at
+ *
+ *    http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied.  See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+
+package org.elasticsearch.search.aggregations.bucket;
+
+import org.apache.lucene.index.LeafReaderContext;
+import org.apache.lucene.search.LeafCollector;
+import org.apache.lucene.search.ScoreDoc;
+import org.apache.lucene.search.Scorer;
+import org.apache.lucene.search.TopDocs;
+import org.apache.lucene.search.TopDocsCollector;
+import org.apache.lucene.search.TopScoreDocCollector;
+import org.elasticsearch.ElasticsearchException;
+import org.elasticsearch.ElasticsearchIllegalStateException;
+import org.elasticsearch.search.aggregations.BucketCollector;
+import org.elasticsearch.search.aggregations.LeafBucketCollector;
+
+import java.io.IOException;
+import java.util.ArrayList;
+import java.util.Arrays;
+import java.util.Comparator;
+import java.util.List;
+
+/**
+ * A specialization of {@link DeferringBucketCollector} that collects all
+ * matches and then replays only the top scoring documents to child
+ * aggregations. The method
+ * {@link BestDocsDeferringCollector#createTopDocsCollector(int)} is designed to
+ * be overridden and allows subclasses to choose a custom collector
+ * implementation for determining the top N matches.
+ * 
+ */
+
+public class BestDocsDeferringCollector extends DeferringBucketCollector {
+    final List<PerSegmentCollects> entries = new ArrayList<>();
+    BucketCollector deferred;
+    TopDocsCollector<? extends ScoreDoc> tdc;
+    boolean finished = false;
+    private int shardSize;
+    private PerSegmentCollects perSegCollector;
+    private int matchedDocs;
+
+    /**
+     * Sole constructor.
+     * 
+     * @param shardSize
+     */
+    public BestDocsDeferringCollector(int shardSize) {
+        this.shardSize = shardSize;
+    }
+
+
+    @Override
+    public boolean needsScores() {
+        return true;
+    }
+
+    /** Set the deferred collectors. */
+    public void setDeferredCollector(Iterable<BucketCollector> deferredCollectors) {
+        this.deferred = BucketCollector.wrap(deferredCollectors);
+        try {
+            tdc = createTopDocsCollector(shardSize);
+        } catch (IOException e) {
+            throw new ElasticsearchException("IO error creating collector", e);
+        }
+    }
+
+    @Override
+    public LeafBucketCollector getLeafCollector(LeafReaderContext ctx) throws IOException {
+        // finishLeaf();
+        perSegCollector = new PerSegmentCollects(ctx);
+        entries.add(perSegCollector);
+
+        // Deferring collector
+        return new LeafBucketCollector() {
+            @Override
+            public void setScorer(Scorer scorer) throws IOException {
+                perSegCollector.setScorer(scorer);
+            }
+
+            @Override
+            public void collect(int doc, long bucket) throws IOException {
+                perSegCollector.collect(doc);
+            }
+        };
+    }
+
+    // Designed to be overridden by subclasses that may score docs by criteria
+    // other than Lucene score
+    protected TopDocsCollector<? extends ScoreDoc> createTopDocsCollector(int size) throws IOException {
+        return TopScoreDocCollector.create(size);
+    }
+
+    @Override
+    public void preCollection() throws IOException {
+    }
+
+    @Override
+    public void postCollection() throws IOException {
+        finished = true;
+    }
+
+    /**
+     * Replay the wrapped collector, but only on a selection of buckets.
+     */
+    @Override
+    public void prepareSelectedBuckets(long... selectedBuckets) throws IOException {
+        if (!finished) {
+            throw new ElasticsearchIllegalStateException("Cannot replay yet, collection is not finished: postCollect() has not been called");
+        }
+        if (selectedBuckets.length > 1) {
+            throw new ElasticsearchIllegalStateException("Collection only supported on a single bucket");
+        }
+
+        deferred.preCollection();
+
+        TopDocs topDocs = tdc.topDocs();
+        ScoreDoc[] sd = topDocs.scoreDocs;
+        matchedDocs = sd.length;
+        // Sort the top matches by docID for the benefit of deferred collector
+        Arrays.sort(sd, new Comparator<ScoreDoc>() {
+            @Override
+            public int compare(ScoreDoc o1, ScoreDoc o2) {
+                return o1.doc - o2.doc;
+            }
+        });
+        try {
+            for (PerSegmentCollects perSegDocs : entries) {
+                perSegDocs.replayRelatedMatches(sd);
+            }
+            // deferred.postCollection();
+        } catch (IOException e) {
+            throw new ElasticsearchException("IOException collecting best scoring results", e);
+        }
+        deferred.postCollection();
+    }
+
+    class PerSegmentCollects extends Scorer {
+        private LeafReaderContext readerContext;
+        int maxDocId = Integer.MIN_VALUE;
+        private float currentScore;
+        private int currentDocId = -1;
+        private LeafCollector currentLeafCollector;
+
+        PerSegmentCollects(LeafReaderContext readerContext) throws IOException {
+            // The publisher behaviour for Reader/Scorer listeners triggers a
+            // call to this constructor with a null scorer so we can't call
+            // scorer.getWeight() and pass the Weight to our base class.
+            // However, passing null seems to have no adverse effects here...
+            super(null);
+            this.readerContext = readerContext;
+            currentLeafCollector = tdc.getLeafCollector(readerContext);
+
+        }
+
+        public void setScorer(Scorer scorer) throws IOException {
+            currentLeafCollector.setScorer(scorer);
+        }
+
+        public void replayRelatedMatches(ScoreDoc[] sd) throws IOException {
+            final LeafBucketCollector leafCollector = deferred.getLeafCollector(readerContext);
+            leafCollector.setScorer(this);
+
+            currentScore = 0;
+            currentDocId = -1;
+            if (maxDocId < 0) {
+                return;
+            }
+            for (ScoreDoc scoreDoc : sd) {
+                // Doc ids from TopDocCollector are root-level Reader so
+                // need rebasing
+                int rebased = scoreDoc.doc - readerContext.docBase;
+                if ((rebased >= 0) && (rebased <= maxDocId)) {
+                    currentScore = scoreDoc.score;
+                    currentDocId = rebased;
+                    leafCollector.collect(rebased, 0);
+                }
+            }
+
+        }
+
+        @Override
+        public float score() throws IOException {
+            return currentScore;
+        }
+
+        @Override
+        public int freq() throws IOException {
+            throw new ElasticsearchException("This caching scorer implementation only implements score() and docID()");
+        }
+
+        @Override
+        public int docID() {
+            return currentDocId;
+        }
+
+        @Override
+        public int nextDoc() throws IOException {
+            throw new ElasticsearchException("This caching scorer implementation only implements score() and docID()");
+        }
+
+        @Override
+        public int advance(int target) throws IOException {
+            throw new ElasticsearchException("This caching scorer implementation only implements score() and docID()");
+        }
+
+        @Override
+        public long cost() {
+            throw new ElasticsearchException("This caching scorer implementation only implements score() and docID()");
+        }
+
+        public void collect(int docId) throws IOException {
+            currentLeafCollector.collect(docId);
+            maxDocId = Math.max(maxDocId, docId);
+        }
+    }
+
+
+    public int getDocCount() {
+        return matchedDocs;
+    }
+
+}

+ 68 - 174
src/main/java/org/elasticsearch/search/aggregations/bucket/DeferringBucketCollector.java

@@ -20,218 +20,112 @@
 package org.elasticsearch.search.aggregations.bucket;
 
 import org.apache.lucene.index.LeafReaderContext;
-import org.apache.lucene.util.packed.PackedInts;
-import org.apache.lucene.util.packed.PackedLongValues;
 import org.elasticsearch.ElasticsearchException;
 import org.elasticsearch.ElasticsearchIllegalStateException;
-import org.elasticsearch.common.lucene.Lucene;
-import org.elasticsearch.common.util.BigArrays;
-import org.elasticsearch.common.util.LongHash;
 import org.elasticsearch.search.aggregations.Aggregator;
-import org.elasticsearch.search.aggregations.Aggregator.SubAggCollectionMode;
-import org.elasticsearch.search.aggregations.support.AggregationContext;
 import org.elasticsearch.search.aggregations.BucketCollector;
 import org.elasticsearch.search.aggregations.InternalAggregation;
 import org.elasticsearch.search.aggregations.LeafBucketCollector;
+import org.elasticsearch.search.aggregations.support.AggregationContext;
 
 import java.io.IOException;
-import java.util.ArrayList;
-import java.util.List;
 
 /**
  * A {@link BucketCollector} that records collected doc IDs and buckets and
  * allows to replay a subset of the collected buckets.
  */
-public final class DeferringBucketCollector extends BucketCollector {
-
-    private static class Entry {
-        final LeafReaderContext context;
-        final PackedLongValues docDeltas;
-        final PackedLongValues buckets;
-
-        public Entry(LeafReaderContext context, PackedLongValues docDeltas, PackedLongValues buckets) {
-            this.context = context;
-            this.docDeltas = docDeltas;
-            this.buckets = buckets;
-        }
-    }
-
-    final List<Entry> entries = new ArrayList<>();
-    BucketCollector collector;
-    LeafReaderContext context;
-    PackedLongValues.Builder docDeltas;
-    PackedLongValues.Builder buckets;
-    long maxBucket = -1;
-    boolean finished = false;
-    LongHash selectedBuckets;
+public abstract class DeferringBucketCollector extends BucketCollector {
 
+    private BucketCollector collector;
     /** Sole constructor. */
     public DeferringBucketCollector() {}
 
-    @Override
-    public boolean needsScores() {
-        if (collector == null) {
-            throw new ElasticsearchIllegalStateException();
-        }
-        return false;
-    }
-
     /** Set the deferred collectors. */
     public void setDeferredCollector(Iterable<BucketCollector> deferredCollectors) {
         this.collector = BucketCollector.wrap(deferredCollectors);
     }
+    
 
-    private void finishLeaf() {
-        if (context != null) {
-            entries.add(new Entry(context, docDeltas.build(), buckets.build()));
-        }
-        context = null;
-        docDeltas = null;
-        buckets = null;
+    public final void replay(long... selectedBuckets) throws IOException
+    {
+        prepareSelectedBuckets(selectedBuckets);
     }
 
-    @Override
-    public LeafBucketCollector getLeafCollector(LeafReaderContext ctx) throws IOException {
-        finishLeaf();
-
-        context = ctx;
-        docDeltas = PackedLongValues.packedBuilder(PackedInts.DEFAULT);
-        buckets = PackedLongValues.packedBuilder(PackedInts.DEFAULT);
-
-        return new LeafBucketCollector() {
-            int lastDoc = 0;
-            @Override
-            public void collect(int doc, long bucket) throws IOException {
-                docDeltas.add(doc - lastDoc);
-                buckets.add(bucket);
-                lastDoc = doc;
-                maxBucket = Math.max(maxBucket, bucket);
-            }
-        };
-    }
+    public abstract void prepareSelectedBuckets(long... selectedBuckets) throws IOException;
 
-    @Override
-    public void preCollection() throws IOException {
+    /**
+     * Wrap the provided aggregator so that it behaves (almost) as if it had
+     * been collected directly.
+     */
+    public Aggregator wrap(final Aggregator in) {
+        return new WrappedAggregator(in);
     }
 
-    @Override
-    public void postCollection() throws IOException {
-        finishLeaf();
-        finished = true;
-    }
+    protected class WrappedAggregator extends Aggregator {
+        private Aggregator in;
 
-    /**
-     * Replay the wrapped collector, but only on a selection of buckets.
-     */
-    public void replay(long... selectedBuckets) throws IOException {
-        if (!finished) {
-            throw new ElasticsearchIllegalStateException("Cannot replay yet, collection is not finished: postCollect() has not been called");
+        WrappedAggregator(Aggregator in) {
+            this.in = in;
         }
-        if (this.selectedBuckets != null) {
-            throw new ElasticsearchIllegalStateException("Alerady been replayed");
+
+        @Override
+        public boolean needsScores() {
+            return in.needsScores();
         }
 
-        final LongHash hash = new LongHash(selectedBuckets.length, BigArrays.NON_RECYCLING_INSTANCE);
-        for (long bucket : selectedBuckets) {
-            hash.add(bucket);
+        @Override
+        public void close() throws ElasticsearchException {
+            in.close();
         }
-        this.selectedBuckets = hash;
 
-        collector.preCollection();
-        if (collector.needsScores()) {
-            throw new ElasticsearchIllegalStateException("Cannot defer if scores are needed");
+        @Override
+        public String name() {
+            return in.name();
         }
 
-        for (Entry entry : entries) {
-            final LeafBucketCollector leafCollector = collector.getLeafCollector(entry.context);
-            leafCollector.setScorer(Lucene.illegalScorer("A limitation of the " + SubAggCollectionMode.BREADTH_FIRST
-                + " collection mode is that scores cannot be buffered along with document IDs"));
-            final PackedLongValues.Iterator docDeltaIterator = entry.docDeltas.iterator();
-            final PackedLongValues.Iterator buckets = entry.buckets.iterator();
-            int doc = 0;
-            for (long i = 0, end = entry.docDeltas.size(); i < end; ++i) {
-                doc += docDeltaIterator.next();
-                final long bucket = buckets.next();
-                final long rebasedBucket = hash.find(bucket);
-                if (rebasedBucket != -1) {
-                    leafCollector.collect(doc, rebasedBucket);
-                }
-            }
+        @Override
+        public Aggregator parent() {
+            return in.parent();
         }
 
-        collector.postCollection();
-    }
+        @Override
+        public AggregationContext context() {
+            return in.context();
+        }
+
+        @Override
+        public Aggregator subAggregator(String name) {
+            return in.subAggregator(name);
+        }
+
+        @Override
+        public InternalAggregation buildAggregation(long bucket) throws IOException {
+            return in.buildAggregation(bucket);
+        }
+
+        @Override
+        public InternalAggregation buildEmptyAggregation() {
+            return in.buildEmptyAggregation();
+        }
+
+        @Override
+        public LeafBucketCollector getLeafCollector(LeafReaderContext ctx) throws IOException {
+            throw new ElasticsearchIllegalStateException(
+                    "Deferred collectors cannot be collected directly. They must be collected through the recording wrapper.");
+        }
+
+        @Override
+        public void preCollection() throws IOException {
+            throw new ElasticsearchIllegalStateException(
+                    "Deferred collectors cannot be collected directly. They must be collected through the recording wrapper.");
+        }
+
+        @Override
+        public void postCollection() throws IOException {
+            throw new ElasticsearchIllegalStateException(
+                    "Deferred collectors cannot be collected directly. They must be collected through the recording wrapper.");
+        }
 
-    /**
-     * Wrap the provided aggregator so that it behaves (almost) as if it had been
-     * collected directly.
-     */
-    public Aggregator wrap(final Aggregator in) {
-        return new Aggregator() {
-
-            @Override
-            public boolean needsScores() {
-                return in.needsScores();
-            }
-
-            @Override
-            public void close() throws ElasticsearchException {
-                in.close();
-            }
-
-            @Override
-            public String name() {
-                return in.name();
-            }
-
-            @Override
-            public Aggregator parent() {
-                return in.parent();
-            }
-
-            @Override
-            public AggregationContext context() {
-                return in.context();
-            }
-
-            @Override
-            public Aggregator subAggregator(String name) {
-                return in.subAggregator(name);
-            }
-
-            @Override
-            public InternalAggregation buildAggregation(long bucket) throws IOException {
-                if (selectedBuckets == null) {
-                    throw new ElasticsearchIllegalStateException("Collection has not been replayed yet.");
-                }
-                final long rebasedBucket = selectedBuckets.find(bucket);
-                if (rebasedBucket == -1) {
-                    throw new ElasticsearchIllegalStateException("Cannot build for a bucket which has not been collected");
-                }
-                return in.buildAggregation(rebasedBucket);
-            }
-
-            @Override
-            public InternalAggregation buildEmptyAggregation() {
-                return in.buildEmptyAggregation();
-            }
-
-            @Override
-            public LeafBucketCollector getLeafCollector(LeafReaderContext ctx) throws IOException {
-                throw new ElasticsearchIllegalStateException("Deferred collectors cannot be collected directly. They must be collected through the recording wrapper.");
-            }
-
-            @Override
-            public void preCollection() throws IOException {
-                throw new ElasticsearchIllegalStateException("Deferred collectors cannot be collected directly. They must be collected through the recording wrapper.");
-            }
-
-            @Override
-            public void postCollection() throws IOException {
-                throw new ElasticsearchIllegalStateException("Deferred collectors cannot be collected directly. They must be collected through the recording wrapper.");
-            }
-
-        };
     }
 
 }

+ 121 - 0
src/main/java/org/elasticsearch/search/aggregations/bucket/sampler/DiversifiedBytesHashSamplerAggregator.java

@@ -0,0 +1,121 @@
+/*
+ * Licensed to Elasticsearch under one or more contributor
+ * license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright
+ * ownership. Elasticsearch licenses this file to you under
+ * the Apache License, Version 2.0 (the "License"); you may
+ * not use this file except in compliance with the License.
+ * You may obtain a copy of the License at
+ *
+ *    http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied.  See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+
+package org.elasticsearch.search.aggregations.bucket.sampler;
+
+import org.apache.lucene.index.LeafReaderContext;
+import org.apache.lucene.index.NumericDocValues;
+import org.apache.lucene.search.DiversifiedTopDocsCollector;
+import org.apache.lucene.search.DiversifiedTopDocsCollector.ScoreDocKey;
+import org.apache.lucene.search.TopDocsCollector;
+import org.apache.lucene.util.BytesRef;
+import org.elasticsearch.ElasticsearchException;
+import org.elasticsearch.ElasticsearchIllegalArgumentException;
+import org.elasticsearch.index.fielddata.SortedBinaryDocValues;
+import org.elasticsearch.search.aggregations.Aggregator;
+import org.elasticsearch.search.aggregations.AggregatorFactories;
+import org.elasticsearch.search.aggregations.bucket.BestDocsDeferringCollector;
+import org.elasticsearch.search.aggregations.bucket.DeferringBucketCollector;
+import org.elasticsearch.search.aggregations.support.AggregationContext;
+import org.elasticsearch.search.aggregations.support.ValuesSource;
+
+import java.io.IOException;
+import java.util.Map;
+
+/**
+ * Alternative, faster implementation for converting String keys to longs but
+ * with the potential for hash collisions.
+ */
+public class DiversifiedBytesHashSamplerAggregator extends SamplerAggregator {
+
+    private ValuesSource valuesSource;
+    private int maxDocsPerValue;
+
+    public DiversifiedBytesHashSamplerAggregator(String name, int shardSize, AggregatorFactories factories,
+            AggregationContext aggregationContext, Aggregator parent, Map<String, Object> metaData, ValuesSource valuesSource,
+            int maxDocsPerValue) throws IOException {
+        super(name, shardSize, factories, aggregationContext, parent, metaData);
+        this.valuesSource = valuesSource;
+        this.maxDocsPerValue = maxDocsPerValue;
+    }
+
+    @Override
+    public DeferringBucketCollector getDeferringCollector() {
+        bdd = new DiverseDocsDeferringCollector();
+        return bdd;
+    }
+
+    /**
+     * A {@link DeferringBucketCollector} that identifies top scoring documents
+     * but de-duped by a key then passes only these on to nested collectors.
+     * This implementation is only for use with a single bucket aggregation.
+     */
+    class DiverseDocsDeferringCollector extends BestDocsDeferringCollector {
+
+        public DiverseDocsDeferringCollector() {
+            super(shardSize);
+        }
+
+
+        @Override
+        protected TopDocsCollector<ScoreDocKey> createTopDocsCollector(int size) {
+            return new ValuesDiversifiedTopDocsCollector(size, maxDocsPerValue);
+        }
+
+        // This class extends the DiversifiedTopDocsCollector and provides
+        // a lookup from elasticsearch's ValuesSource
+        class ValuesDiversifiedTopDocsCollector extends DiversifiedTopDocsCollector {
+
+            private SortedBinaryDocValues values;
+
+            public ValuesDiversifiedTopDocsCollector(int numHits, int maxHitsPerValue) {
+                super(numHits, maxHitsPerValue);
+
+            }
+
+            @Override
+            protected NumericDocValues getKeys(LeafReaderContext context) {
+                try {
+                    values = valuesSource.bytesValues(context);
+                } catch (IOException e) {
+                    throw new ElasticsearchException("Error reading values", e);
+                }
+                return new NumericDocValues() {
+                    @Override
+                    public long get(int doc) {
+
+                        values.setDocument(doc);
+                        final int valuesCount = values.count();
+                        if (valuesCount > 1) {
+                            throw new ElasticsearchIllegalArgumentException("Sample diversifying key must be a single valued-field");
+                        }
+                        if (valuesCount == 1) {
+                            final BytesRef bytes = values.valueAt(0);
+                            return bytes.hashCode();
+                        }
+                        return 0;
+                    }
+                };
+            }
+
+        }
+
+    }
+
+}

+ 133 - 0
src/main/java/org/elasticsearch/search/aggregations/bucket/sampler/DiversifiedMapSamplerAggregator.java

@@ -0,0 +1,133 @@
+/*
+ * Licensed to Elasticsearch under one or more contributor
+ * license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright
+ * ownership. Elasticsearch licenses this file to you under
+ * the Apache License, Version 2.0 (the "License"); you may
+ * not use this file except in compliance with the License.
+ * You may obtain a copy of the License at
+ *
+ *    http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied.  See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+
+package org.elasticsearch.search.aggregations.bucket.sampler;
+
+import org.apache.lucene.index.LeafReaderContext;
+import org.apache.lucene.index.NumericDocValues;
+import org.apache.lucene.search.DiversifiedTopDocsCollector;
+import org.apache.lucene.search.DiversifiedTopDocsCollector.ScoreDocKey;
+import org.apache.lucene.search.TopDocsCollector;
+import org.apache.lucene.util.BytesRef;
+import org.elasticsearch.ElasticsearchException;
+import org.elasticsearch.ElasticsearchIllegalArgumentException;
+import org.elasticsearch.common.lease.Releasables;
+import org.elasticsearch.common.util.BytesRefHash;
+import org.elasticsearch.index.fielddata.SortedBinaryDocValues;
+import org.elasticsearch.search.aggregations.Aggregator;
+import org.elasticsearch.search.aggregations.AggregatorFactories;
+import org.elasticsearch.search.aggregations.bucket.BestDocsDeferringCollector;
+import org.elasticsearch.search.aggregations.bucket.DeferringBucketCollector;
+import org.elasticsearch.search.aggregations.support.AggregationContext;
+import org.elasticsearch.search.aggregations.support.ValuesSource;
+
+import java.io.IOException;
+import java.util.Map;
+
+public class DiversifiedMapSamplerAggregator extends SamplerAggregator {
+
+    private ValuesSource valuesSource;
+    private int maxDocsPerValue;
+    private BytesRefHash bucketOrds;
+
+    public DiversifiedMapSamplerAggregator(String name, int shardSize, AggregatorFactories factories,
+            AggregationContext aggregationContext, Aggregator parent, Map<String, Object> metaData, ValuesSource valuesSource,
+            int maxDocsPerValue) throws IOException {
+        super(name, shardSize, factories, aggregationContext, parent, metaData);
+        this.valuesSource = valuesSource;
+        this.maxDocsPerValue = maxDocsPerValue;
+        bucketOrds = new BytesRefHash(shardSize, aggregationContext.bigArrays());
+
+    }
+
+    @Override
+    protected void doClose() {
+        Releasables.close(bucketOrds);
+        super.doClose();
+    }
+
+    @Override
+    public DeferringBucketCollector getDeferringCollector() {
+        bdd = new DiverseDocsDeferringCollector();
+        return bdd;
+    }
+
+    /**
+     * A {@link DeferringBucketCollector} that identifies top scoring documents
+     * but de-duped by a key then passes only these on to nested collectors.
+     * This implementation is only for use with a single bucket aggregation.
+     */
+    class DiverseDocsDeferringCollector extends BestDocsDeferringCollector {
+
+        public DiverseDocsDeferringCollector() {
+            super(shardSize);
+        }
+
+
+        @Override
+        protected TopDocsCollector<ScoreDocKey> createTopDocsCollector(int size) {
+            return new ValuesDiversifiedTopDocsCollector(size, maxDocsPerValue);
+        }
+
+        // This class extends the DiversifiedTopDocsCollector and provides
+        // a lookup from elasticsearch's ValuesSource
+        class ValuesDiversifiedTopDocsCollector extends DiversifiedTopDocsCollector {
+
+            private SortedBinaryDocValues values;
+
+            public ValuesDiversifiedTopDocsCollector(int numHits, int maxHitsPerKey) {
+                super(numHits, maxHitsPerKey);
+
+            }
+
+            @Override
+            protected NumericDocValues getKeys(LeafReaderContext context) {
+                try {
+                    values = valuesSource.bytesValues(context);
+                } catch (IOException e) {
+                    throw new ElasticsearchException("Error reading values", e);
+                }
+                return new NumericDocValues() {
+                    @Override
+                    public long get(int doc) {
+
+                        values.setDocument(doc);
+                        final int valuesCount = values.count();
+                        if (valuesCount > 1) {
+                            throw new ElasticsearchIllegalArgumentException("Sample diversifying key must be a single valued-field");
+                        }
+                        if (valuesCount == 1) {
+                            final BytesRef bytes = values.valueAt(0);
+
+                            long bucketOrdinal = bucketOrds.add(bytes);
+                            if (bucketOrdinal < 0) { // already seen
+                                bucketOrdinal = -1 - bucketOrdinal;
+                            }
+                            return bucketOrdinal;
+                        }
+                        return 0;
+                    }
+                };
+            }
+
+        }
+
+    }
+
+}

+ 111 - 0
src/main/java/org/elasticsearch/search/aggregations/bucket/sampler/DiversifiedNumericSamplerAggregator.java

@@ -0,0 +1,111 @@
+/*
+ * Licensed to Elasticsearch under one or more contributor
+ * license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright
+ * ownership. Elasticsearch licenses this file to you under
+ * the Apache License, Version 2.0 (the "License"); you may
+ * not use this file except in compliance with the License.
+ * You may obtain a copy of the License at
+ *
+ *    http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied.  See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+
+package org.elasticsearch.search.aggregations.bucket.sampler;
+
+import org.apache.lucene.index.LeafReaderContext;
+import org.apache.lucene.index.NumericDocValues;
+import org.apache.lucene.index.SortedNumericDocValues;
+import org.apache.lucene.search.DiversifiedTopDocsCollector;
+import org.apache.lucene.search.DiversifiedTopDocsCollector.ScoreDocKey;
+import org.apache.lucene.search.TopDocsCollector;
+import org.elasticsearch.ElasticsearchException;
+import org.elasticsearch.ElasticsearchIllegalArgumentException;
+import org.elasticsearch.search.aggregations.Aggregator;
+import org.elasticsearch.search.aggregations.AggregatorFactories;
+import org.elasticsearch.search.aggregations.bucket.BestDocsDeferringCollector;
+import org.elasticsearch.search.aggregations.bucket.DeferringBucketCollector;
+import org.elasticsearch.search.aggregations.support.AggregationContext;
+import org.elasticsearch.search.aggregations.support.ValuesSource;
+
+import java.io.IOException;
+import java.util.Map;
+
+public class DiversifiedNumericSamplerAggregator extends SamplerAggregator {
+
+    private ValuesSource.Numeric valuesSource;
+    private int maxDocsPerValue;
+
+    public DiversifiedNumericSamplerAggregator(String name, int shardSize, AggregatorFactories factories,
+            AggregationContext aggregationContext, Aggregator parent, Map<String, Object> metaData, ValuesSource.Numeric valuesSource,
+            int maxDocsPerValue) throws IOException {
+        super(name, shardSize, factories, aggregationContext, parent, metaData);
+        this.valuesSource = valuesSource;
+        this.maxDocsPerValue = maxDocsPerValue;
+    }
+
+    @Override
+    public DeferringBucketCollector getDeferringCollector() {
+        bdd = new DiverseDocsDeferringCollector();
+        return bdd;
+    }
+
+    /**
+     * A {@link DeferringBucketCollector} that identifies top scoring documents
+     * but de-duped by a key then passes only these on to nested collectors.
+     * This implementation is only for use with a single bucket aggregation.
+     */
+    class DiverseDocsDeferringCollector extends BestDocsDeferringCollector {
+        public DiverseDocsDeferringCollector() {
+            super(shardSize);
+        }
+
+        @Override
+        protected TopDocsCollector<ScoreDocKey> createTopDocsCollector(int size) {
+            return new ValuesDiversifiedTopDocsCollector(size, maxDocsPerValue);
+        }
+
+        // This class extends the DiversifiedTopDocsCollector and provides
+        // a lookup from elasticsearch's ValuesSource
+        class ValuesDiversifiedTopDocsCollector extends DiversifiedTopDocsCollector {
+
+            private SortedNumericDocValues values;
+
+            public ValuesDiversifiedTopDocsCollector(int numHits, int maxHitsPerKey) {
+                super(numHits, maxHitsPerKey);
+            }
+
+            @Override
+            protected NumericDocValues getKeys(LeafReaderContext context) {
+                try {
+                    values = valuesSource.longValues(context);
+                } catch (IOException e) {
+                    throw new ElasticsearchException("Error reading values", e);
+                }
+                return new NumericDocValues() {
+                    @Override
+                    public long get(int doc) {
+                        values.setDocument(doc);
+                        final int valuesCount = values.count();
+                        if (valuesCount > 1) {
+                            throw new ElasticsearchIllegalArgumentException("Sample diversifying key must be a single valued-field");
+                        }
+                        if (valuesCount == 1) {
+                            return values.valueAt(0);
+                        }
+                        return Long.MIN_VALUE;
+                    }
+                };
+            }
+
+        }
+
+    }
+
+}

+ 119 - 0
src/main/java/org/elasticsearch/search/aggregations/bucket/sampler/DiversifiedOrdinalsSamplerAggregator.java

@@ -0,0 +1,119 @@
+/*
+ * Licensed to Elasticsearch under one or more contributor
+ * license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright
+ * ownership. Elasticsearch licenses this file to you under
+ * the Apache License, Version 2.0 (the "License"); you may
+ * not use this file except in compliance with the License.
+ * You may obtain a copy of the License at
+ *
+ *    http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied.  See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+
+package org.elasticsearch.search.aggregations.bucket.sampler;
+
+import org.apache.lucene.index.DocValues;
+import org.apache.lucene.index.LeafReaderContext;
+import org.apache.lucene.index.NumericDocValues;
+import org.apache.lucene.index.RandomAccessOrds;
+import org.apache.lucene.index.SortedDocValues;
+import org.apache.lucene.search.DiversifiedTopDocsCollector;
+import org.apache.lucene.search.DiversifiedTopDocsCollector.ScoreDocKey;
+import org.apache.lucene.search.TopDocsCollector;
+import org.elasticsearch.ElasticsearchIllegalArgumentException;
+import org.elasticsearch.search.aggregations.Aggregator;
+import org.elasticsearch.search.aggregations.AggregatorFactories;
+import org.elasticsearch.search.aggregations.bucket.BestDocsDeferringCollector;
+import org.elasticsearch.search.aggregations.bucket.DeferringBucketCollector;
+import org.elasticsearch.search.aggregations.support.AggregationContext;
+import org.elasticsearch.search.aggregations.support.ValuesSource;
+
+import java.io.IOException;
+import java.util.Map;
+
+public class DiversifiedOrdinalsSamplerAggregator extends SamplerAggregator {
+
+    private ValuesSource.Bytes.WithOrdinals.FieldData valuesSource;
+    private int maxDocsPerValue;
+
+    public DiversifiedOrdinalsSamplerAggregator(String name, int shardSize, AggregatorFactories factories,
+            AggregationContext aggregationContext, Aggregator parent, Map<String, Object> metaData,
+            ValuesSource.Bytes.WithOrdinals.FieldData valuesSource, int maxDocsPerValue) throws IOException {
+        super(name, shardSize, factories, aggregationContext, parent, metaData);
+        this.valuesSource = valuesSource;
+        this.maxDocsPerValue = maxDocsPerValue;
+    }
+
+    @Override
+    public DeferringBucketCollector getDeferringCollector() {
+        bdd = new DiverseDocsDeferringCollector();
+        return bdd;
+    }
+
+    /**
+     * A {@link DeferringBucketCollector} that identifies top scoring documents
+     * but de-duped by a key then passes only these on to nested collectors.
+     * This implementation is only for use with a single bucket aggregation.
+     */
+    class DiverseDocsDeferringCollector extends BestDocsDeferringCollector {
+
+        public DiverseDocsDeferringCollector() {
+            super(shardSize);
+        }
+
+        @Override
+        protected TopDocsCollector<ScoreDocKey> createTopDocsCollector(int size) {
+            return new ValuesDiversifiedTopDocsCollector(size, maxDocsPerValue);
+        }
+
+        // This class extends the DiversifiedTopDocsCollector and provides
+        // a lookup from elasticsearch's ValuesSource
+        class ValuesDiversifiedTopDocsCollector extends DiversifiedTopDocsCollector {
+
+
+            public ValuesDiversifiedTopDocsCollector(int numHits, int maxHitsPerKey) {
+                super(numHits, maxHitsPerKey);
+            }
+
+            @Override
+            protected NumericDocValues getKeys(LeafReaderContext context) {
+                final RandomAccessOrds globalOrds = valuesSource.globalOrdinalsValues(context);
+                final SortedDocValues singleValues = DocValues.unwrapSingleton(globalOrds);
+                if (singleValues != null) {
+                    return new NumericDocValues() {
+                        @Override
+                        public long get(int doc) {
+                            return singleValues.getOrd(doc);
+                        }
+                    };
+                }
+                return new NumericDocValues() {
+                    @Override
+                    public long get(int doc) {
+                        globalOrds.setDocument(doc);
+                        final long valuesCount = globalOrds.cardinality();
+                        if (valuesCount > 1) {
+                            throw new ElasticsearchIllegalArgumentException("Sample diversifying key must be a single valued-field");
+                        }
+                        if (valuesCount == 1) {
+                            long result = globalOrds.ordAt(0);
+                            return result;
+                        }
+                        return Long.MIN_VALUE;
+                    }
+                };
+
+            }
+
+        }
+
+    }
+
+}

+ 65 - 0
src/main/java/org/elasticsearch/search/aggregations/bucket/sampler/InternalSampler.java

@@ -0,0 +1,65 @@
+/*
+ * Licensed to Elasticsearch under one or more contributor
+ * license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright
+ * ownership. Elasticsearch licenses this file to you under
+ * the Apache License, Version 2.0 (the "License"); you may
+ * not use this file except in compliance with the License.
+ * You may obtain a copy of the License at
+ *
+ *    http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied.  See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+package org.elasticsearch.search.aggregations.bucket.sampler;
+
+import org.elasticsearch.common.io.stream.StreamInput;
+import org.elasticsearch.search.aggregations.AggregationStreams;
+import org.elasticsearch.search.aggregations.InternalAggregations;
+import org.elasticsearch.search.aggregations.bucket.InternalSingleBucketAggregation;
+
+import java.io.IOException;
+import java.util.Map;
+
+/**
+*
+*/
+public class InternalSampler extends InternalSingleBucketAggregation implements Sampler {
+
+    public final static Type TYPE = new Type("sampler");
+
+    public final static AggregationStreams.Stream STREAM = new AggregationStreams.Stream() {
+        @Override
+        public InternalSampler readResult(StreamInput in) throws IOException {
+            InternalSampler result = new InternalSampler();
+            result.readFrom(in);
+            return result;
+        }
+    };
+
+    public static void registerStreams() {
+        AggregationStreams.registerStream(STREAM, TYPE.stream());
+    }
+
+    InternalSampler() {
+    } // for serialization
+
+    InternalSampler(String name, long docCount, InternalAggregations subAggregations, Map<String, Object> metaData) {
+        super(name, docCount, subAggregations, metaData);
+    }
+
+    @Override
+    public Type type() {
+        return TYPE;
+    }
+
+    @Override
+    protected InternalSingleBucketAggregation newAggregation(String name, long docCount, InternalAggregations subAggregations) {
+        return new InternalSampler(name, docCount, subAggregations, metaData);
+    }
+}

+ 29 - 0
src/main/java/org/elasticsearch/search/aggregations/bucket/sampler/Sampler.java

@@ -0,0 +1,29 @@
+/*
+ * Licensed to Elasticsearch under one or more contributor
+ * license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright
+ * ownership. Elasticsearch licenses this file to you under
+ * the Apache License, Version 2.0 (the "License"); you may
+ * not use this file except in compliance with the License.
+ * You may obtain a copy of the License at
+ *
+ *    http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied.  See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+package org.elasticsearch.search.aggregations.bucket.sampler;
+
+import org.elasticsearch.search.aggregations.bucket.SingleBucketAggregation;
+
+/**
+ * A {@code filter} aggregation that defines a single bucket to hold a sample of
+ * top-matching documents. Computation of child aggregations is deferred until
+ * the top-matching documents on a shard have been determined.
+ */
+public interface Sampler extends SingleBucketAggregation {
+}

+ 80 - 0
src/main/java/org/elasticsearch/search/aggregations/bucket/sampler/SamplerAggregationBuilder.java

@@ -0,0 +1,80 @@
+/*
+ * Licensed to Elasticsearch under one or more contributor
+ * license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright
+ * ownership. Elasticsearch licenses this file to you under
+ * the Apache License, Version 2.0 (the "License"); you may
+ * not use this file except in compliance with the License.
+ * You may obtain a copy of the License at
+ *
+ *    http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied.  See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+
+package org.elasticsearch.search.aggregations.bucket.sampler;
+
+import org.elasticsearch.common.xcontent.XContentBuilder;
+import org.elasticsearch.search.aggregations.ValuesSourceAggregationBuilder;
+
+import java.io.IOException;
+
+/**
+ * Builder for the {@link Sampler} aggregation.
+ */
+public class SamplerAggregationBuilder extends ValuesSourceAggregationBuilder<SamplerAggregationBuilder> {
+
+    private int shardSize = SamplerParser.DEFAULT_SHARD_SAMPLE_SIZE;
+
+    int maxDocsPerValue = SamplerParser.MAX_DOCS_PER_VALUE_DEFAULT;
+    String executionHint = null;
+
+    /**
+     * Sole constructor.
+     */
+    public SamplerAggregationBuilder(String name) {
+        super(name, InternalSampler.TYPE.name());
+    }
+
+    /**
+     * Set the max num docs to be returned from each shard.
+     */
+    public SamplerAggregationBuilder shardSize(int shardSize) {
+        this.shardSize = shardSize;
+        return this;
+    }
+
+    public SamplerAggregationBuilder maxDocsPerValue(int maxDocsPerValue) {
+        this.maxDocsPerValue = maxDocsPerValue;
+        return this;
+    }
+
+    public SamplerAggregationBuilder executionHint(String executionHint) {
+        this.executionHint = executionHint;
+        return this;
+    }
+
+    @Override
+    protected XContentBuilder doInternalXContent(XContentBuilder builder, Params params) throws IOException {
+        // builder.startObject();
+        if (shardSize != SamplerParser.DEFAULT_SHARD_SAMPLE_SIZE) {
+            builder.field(SamplerParser.SHARD_SIZE_FIELD.getPreferredName(), shardSize);
+        }
+
+        if (maxDocsPerValue != SamplerParser.MAX_DOCS_PER_VALUE_DEFAULT) {
+            builder.field(SamplerParser.MAX_DOCS_PER_VALUE_FIELD.getPreferredName(), maxDocsPerValue);
+        }
+        if (executionHint != null) {
+            builder.field(SamplerParser.EXECUTION_HINT_FIELD.getPreferredName(), executionHint);
+        }
+
+        return builder;
+    }
+
+
+}

+ 264 - 0
src/main/java/org/elasticsearch/search/aggregations/bucket/sampler/SamplerAggregator.java

@@ -0,0 +1,264 @@
+/*
+ * Licensed to Elasticsearch under one or more contributor
+ * license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright
+ * ownership. Elasticsearch licenses this file to you under
+ * the Apache License, Version 2.0 (the "License"); you may
+ * not use this file except in compliance with the License.
+ * You may obtain a copy of the License at
+ *
+ *    http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied.  See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+package org.elasticsearch.search.aggregations.bucket.sampler;
+
+import org.apache.lucene.index.LeafReaderContext;
+import org.elasticsearch.ElasticsearchIllegalArgumentException;
+import org.elasticsearch.common.ParseField;
+import org.elasticsearch.search.aggregations.AggregationExecutionException;
+import org.elasticsearch.search.aggregations.Aggregator;
+import org.elasticsearch.search.aggregations.AggregatorFactories;
+import org.elasticsearch.search.aggregations.AggregatorFactory;
+import org.elasticsearch.search.aggregations.InternalAggregation;
+import org.elasticsearch.search.aggregations.LeafBucketCollector;
+import org.elasticsearch.search.aggregations.NonCollectingAggregator;
+import org.elasticsearch.search.aggregations.bucket.BestDocsDeferringCollector;
+import org.elasticsearch.search.aggregations.bucket.DeferringBucketCollector;
+import org.elasticsearch.search.aggregations.bucket.SingleBucketAggregator;
+import org.elasticsearch.search.aggregations.support.AggregationContext;
+import org.elasticsearch.search.aggregations.support.ValuesSource;
+import org.elasticsearch.search.aggregations.support.ValuesSource.Numeric;
+import org.elasticsearch.search.aggregations.support.ValuesSourceAggregatorFactory;
+import org.elasticsearch.search.aggregations.support.ValuesSourceConfig;
+
+import java.io.IOException;
+import java.util.Map;
+
+/**
+ * Aggregate on only the top-scoring docs on a shard.
+ * 
+ * TODO currently the diversity feature of this agg offers only 'script' and
+ * 'field' as a means of generating a de-dup value. In future it would be nice
+ * if users could use any of the "bucket" aggs syntax (geo, date histogram...)
+ * as the basis for generating de-dup values. Their syntax for creating bucket
+ * values would be preferable to users having to recreate this logic in a
+ * 'script' e.g. to turn a datetime in milliseconds into a month key value.
+ */
+public class SamplerAggregator extends SingleBucketAggregator {
+
+
+    public enum ExecutionMode {
+
+        MAP(new ParseField("map")) {
+
+            @Override
+            Aggregator create(String name, AggregatorFactories factories, int shardSize, int maxDocsPerValue, ValuesSource valuesSource,
+                    AggregationContext context, Aggregator parent, Map<String, Object> metaData) throws IOException {
+
+                return new DiversifiedMapSamplerAggregator(name, shardSize, factories, context, parent, metaData, valuesSource,
+                        maxDocsPerValue);
+            }
+
+            @Override
+            boolean needsGlobalOrdinals() {
+                return false;
+            }
+
+        },
+        BYTES_HASH(new ParseField("bytes_hash")) {
+
+            @Override
+            Aggregator create(String name, AggregatorFactories factories, int shardSize, int maxDocsPerValue, ValuesSource valuesSource,
+                    AggregationContext context, Aggregator parent, Map<String, Object> metaData) throws IOException {
+
+                return new DiversifiedBytesHashSamplerAggregator(name, shardSize, factories, context, parent, metaData, valuesSource,
+                        maxDocsPerValue);
+            }
+
+            @Override
+            boolean needsGlobalOrdinals() {
+                return false;
+            }
+
+        },
+        GLOBAL_ORDINALS(new ParseField("global_ordinals")) {
+
+            @Override
+            Aggregator create(String name, AggregatorFactories factories, int shardSize, int maxDocsPerValue, ValuesSource valuesSource,
+                    AggregationContext context, Aggregator parent, Map<String, Object> metaData) throws IOException {
+                return new DiversifiedOrdinalsSamplerAggregator(name, shardSize, factories, context, parent, metaData,
+                        (ValuesSource.Bytes.WithOrdinals.FieldData) valuesSource, maxDocsPerValue);
+            }
+
+            @Override
+            boolean needsGlobalOrdinals() {
+                return true;
+            }
+
+        };
+
+        public static ExecutionMode fromString(String value) {
+            for (ExecutionMode mode : values()) {
+                if (mode.parseField.match(value)) {
+                    return mode;
+                }
+            }
+            throw new ElasticsearchIllegalArgumentException("Unknown `execution_hint`: [" + value + "], expected any of " + values());
+        }
+
+        private final ParseField parseField;
+
+        ExecutionMode(ParseField parseField) {
+            this.parseField = parseField;
+        }
+
+        abstract Aggregator create(String name, AggregatorFactories factories, int shardSize, int maxDocsPerValue, ValuesSource valuesSource,
+                AggregationContext context, Aggregator parent, Map<String, Object> metaData) throws IOException;
+
+        abstract boolean needsGlobalOrdinals();
+
+        @Override
+        public String toString() {
+            return parseField.getPreferredName();
+        }
+    }    
+    
+
+    protected final int shardSize;
+    protected BestDocsDeferringCollector bdd;
+
+    public SamplerAggregator(String name, int shardSize, AggregatorFactories factories,
+            AggregationContext aggregationContext, Aggregator parent, Map<String, Object> metaData) throws IOException {
+        super(name, factories, aggregationContext, parent, metaData);
+        this.shardSize = shardSize;
+    }
+
+    @Override
+    public boolean needsScores() {
+        return true;
+    }
+
+    @Override
+    public DeferringBucketCollector getDeferringCollector() {
+        bdd = new BestDocsDeferringCollector(shardSize);
+        return bdd;
+
+    }
+
+
+    @Override
+    protected boolean shouldDefer(Aggregator aggregator) {
+        return true;
+    }
+
+    @Override
+    public InternalAggregation buildAggregation(long owningBucketOrdinal) throws IOException {
+        runDeferredCollections(owningBucketOrdinal);
+        return new InternalSampler(name, bdd == null ? 0 : bdd.getDocCount(), bucketAggregations(owningBucketOrdinal), metaData());
+    }
+
+    @Override
+    public InternalAggregation buildEmptyAggregation() {
+        return new InternalSampler(name, 0, buildEmptySubAggregations(), metaData());
+    }
+
+    public static class Factory extends AggregatorFactory {
+
+        private int shardSize;
+
+        public Factory(String name, int shardSize) {
+            super(name, InternalSampler.TYPE.name());
+            this.shardSize = shardSize;
+        }
+
+        @Override
+        public Aggregator createInternal(AggregationContext context, Aggregator parent, boolean collectsFromSingleBucket,
+                Map<String, Object> metaData) throws IOException {
+
+            if (collectsFromSingleBucket == false) {
+                return asMultiBucketAggregator(this, context, parent);
+            }
+            return new SamplerAggregator(name, shardSize, factories, context, parent, metaData);
+        }
+
+    }
+
+    public static class DiversifiedFactory extends ValuesSourceAggregatorFactory<ValuesSource> {
+
+        private int shardSize;
+        private int maxDocsPerValue;
+        private String executionHint;
+
+        public DiversifiedFactory(String name, int shardSize, String executionHint, ValuesSourceConfig vsConfig, int maxDocsPerValue) {
+            super(name, InternalSampler.TYPE.name(), vsConfig);
+            this.shardSize = shardSize;
+            this.maxDocsPerValue = maxDocsPerValue;
+            this.executionHint = executionHint;
+        }
+
+        @Override
+        protected Aggregator doCreateInternal(ValuesSource valuesSource, AggregationContext context, Aggregator parent,
+                boolean collectsFromSingleBucket, Map<String, Object> metaData) throws IOException {
+
+            if (collectsFromSingleBucket == false) {
+                return asMultiBucketAggregator(this, context, parent);
+            }
+
+
+            if (valuesSource instanceof ValuesSource.Numeric) {
+                return new DiversifiedNumericSamplerAggregator(name, shardSize, factories, context, parent, metaData,
+                        (Numeric) valuesSource, maxDocsPerValue);
+            }
+            
+            if (valuesSource instanceof ValuesSource.Bytes) {
+                ExecutionMode execution = null;
+                if (executionHint != null) {
+                    execution = ExecutionMode.fromString(executionHint);
+                }
+
+                // In some cases using ordinals is just not supported: override
+                // it
+                if(execution==null){
+                    execution = ExecutionMode.GLOBAL_ORDINALS;
+                }
+                if ((execution.needsGlobalOrdinals()) && (!(valuesSource instanceof ValuesSource.Bytes.WithOrdinals))) {
+                    execution = ExecutionMode.MAP;
+                }
+                return execution.create(name, factories, shardSize, maxDocsPerValue, valuesSource, context, parent, metaData);
+            }
+            
+            throw new AggregationExecutionException("Sampler aggregation cannot be applied to field [" + config.fieldContext().field() +
+                    "]. It can only be applied to numeric or string fields.");
+        }
+
+        @Override
+        protected Aggregator createUnmapped(AggregationContext aggregationContext, Aggregator parent, Map<String, Object> metaData)
+                throws IOException {
+            final UnmappedSampler aggregation = new UnmappedSampler(name, metaData);
+
+            return new NonCollectingAggregator(name, aggregationContext, parent, factories, metaData) {
+                @Override
+                public InternalAggregation buildEmptyAggregation() {
+                    return aggregation;
+                }
+            };
+        }
+
+    }
+
+    @Override
+    protected LeafBucketCollector getLeafCollector(LeafReaderContext ctx, LeafBucketCollector sub) throws IOException {
+        if (bdd == null) {
+            throw new AggregationExecutionException("Sampler aggregation must be used with child aggregations.");
+        }
+        return bdd.getLeafCollector(ctx);
+    }
+
+}
+

+ 104 - 0
src/main/java/org/elasticsearch/search/aggregations/bucket/sampler/SamplerParser.java

@@ -0,0 +1,104 @@
+/*
+ * Licensed to Elasticsearch under one or more contributor
+ * license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright
+ * ownership. Elasticsearch licenses this file to you under
+ * the Apache License, Version 2.0 (the "License"); you may
+ * not use this file except in compliance with the License.
+ * You may obtain a copy of the License at
+ *
+ *    http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied.  See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+package org.elasticsearch.search.aggregations.bucket.sampler;
+
+import org.elasticsearch.common.ParseField;
+import org.elasticsearch.common.xcontent.XContentParser;
+import org.elasticsearch.search.SearchParseException;
+import org.elasticsearch.search.aggregations.Aggregator;
+import org.elasticsearch.search.aggregations.AggregatorFactory;
+import org.elasticsearch.search.aggregations.support.ValuesSourceConfig;
+import org.elasticsearch.search.aggregations.support.ValuesSourceParser;
+import org.elasticsearch.search.internal.SearchContext;
+
+import java.io.IOException;
+
+/**
+ *
+ */
+public class SamplerParser implements Aggregator.Parser {
+
+    public static final int DEFAULT_SHARD_SAMPLE_SIZE = 100;
+    public static final ParseField SHARD_SIZE_FIELD = new ParseField("shard_size");
+    public static final ParseField MAX_DOCS_PER_VALUE_FIELD = new ParseField("max_docs_per_value");
+    public static final ParseField EXECUTION_HINT_FIELD = new ParseField("execution_hint");
+    public static final boolean DEFAULT_USE_GLOBAL_ORDINALS = false;
+    public static final int MAX_DOCS_PER_VALUE_DEFAULT = 1;
+
+
+    @Override
+    public String type() {
+        return InternalSampler.TYPE.name();
+    }
+
+    @Override
+    public AggregatorFactory parse(String aggregationName, XContentParser parser, SearchContext context) throws IOException {
+
+        XContentParser.Token token;
+        String currentFieldName = null;
+        String executionHint = null;
+        int shardSize = DEFAULT_SHARD_SAMPLE_SIZE;
+        int maxDocsPerValue = MAX_DOCS_PER_VALUE_DEFAULT;
+        ValuesSourceParser vsParser = null;
+        boolean diversityChoiceMade = false;
+
+        vsParser = ValuesSourceParser.any(aggregationName, InternalSampler.TYPE, context).scriptable(true).formattable(false).build();
+
+        while ((token = parser.nextToken()) != XContentParser.Token.END_OBJECT) {
+            if (token == XContentParser.Token.FIELD_NAME) {
+                currentFieldName = parser.currentName();
+            } else if (vsParser.token(currentFieldName, token, parser)) {
+                continue;
+            } else if (token == XContentParser.Token.VALUE_NUMBER) {
+                if (SHARD_SIZE_FIELD.match(currentFieldName)) {
+                    shardSize = parser.intValue();
+                } else if (MAX_DOCS_PER_VALUE_FIELD.match(currentFieldName)) {
+                    diversityChoiceMade = true;
+                    maxDocsPerValue = parser.intValue();
+                } else {
+                    throw new SearchParseException(context, "Unsupported property \"" + currentFieldName + "\" for aggregation \""
+                            + aggregationName);
+                }
+            } else if (!vsParser.token(currentFieldName, token, parser)) {
+                if (EXECUTION_HINT_FIELD.match(currentFieldName)) {
+                    executionHint = parser.text();
+                } else {
+                    throw new SearchParseException(context, "Unexpected token " + token + " in [" + aggregationName + "].");
+                }
+            } else {
+                throw new SearchParseException(context, "Unsupported property \"" + currentFieldName + "\" for aggregation \""
+                        + aggregationName);
+            }
+        }
+
+        ValuesSourceConfig vsConfig = vsParser.config();
+        if (vsConfig.valid()) {
+            return new SamplerAggregator.DiversifiedFactory(aggregationName, shardSize, executionHint, vsConfig, maxDocsPerValue);
+        } else {
+            if (diversityChoiceMade) {
+                throw new SearchParseException(context, "Sampler aggregation has " + MAX_DOCS_PER_VALUE_FIELD.getPreferredName()
+                        + " setting but no \"field\" or \"script\" setting to provide values for aggregation \"" + aggregationName + "\"");
+
+            }
+            return new SamplerAggregator.Factory(aggregationName, shardSize);
+        }
+    }
+
+
+}

+ 80 - 0
src/main/java/org/elasticsearch/search/aggregations/bucket/sampler/UnmappedSampler.java

@@ -0,0 +1,80 @@
+/*
+ * Licensed to Elasticsearch under one or more contributor
+ * license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright
+ * ownership. Elasticsearch licenses this file to you under
+ * the Apache License, Version 2.0 (the "License"); you may
+ * not use this file except in compliance with the License.
+ * You may obtain a copy of the License at
+ *
+ *    http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied.  See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+package org.elasticsearch.search.aggregations.bucket.sampler;
+
+import org.elasticsearch.common.io.stream.StreamInput;
+import org.elasticsearch.common.xcontent.XContentBuilder;
+import org.elasticsearch.search.aggregations.AggregationStreams;
+import org.elasticsearch.search.aggregations.InternalAggregation;
+import org.elasticsearch.search.aggregations.InternalAggregations;
+
+import java.io.IOException;
+import java.util.List;
+import java.util.Map;
+
+/**
+ *
+ */
+public class UnmappedSampler extends InternalSampler {
+
+    public static final Type TYPE = new Type("sampler", "umsampler");
+
+
+    public static final AggregationStreams.Stream STREAM = new AggregationStreams.Stream() {
+        @Override
+        public UnmappedSampler readResult(StreamInput in) throws IOException {
+            UnmappedSampler sampler = new UnmappedSampler();
+            sampler.readFrom(in);
+            return sampler;
+        }
+    };
+
+    public static void registerStreams() {
+        AggregationStreams.registerStream(STREAM, TYPE.stream());
+    }
+
+    UnmappedSampler() {
+    }
+
+    public UnmappedSampler(String name, Map<String, Object> metaData) {
+        super(name, 0, InternalAggregations.EMPTY, metaData);
+    }
+
+    @Override
+    public Type type() {
+        return TYPE;
+    }
+
+    @Override
+    public InternalAggregation reduce(List<InternalAggregation> aggregations, ReduceContext reduceContext) {
+        for (InternalAggregation agg : aggregations) {
+            if (!(agg instanceof UnmappedSampler)) {
+                return agg.reduce(aggregations, reduceContext);
+            }
+        }
+        return this;
+    }
+
+    @Override
+    public XContentBuilder doXContentBody(XContentBuilder builder, Params params) throws IOException {
+        builder.field(InternalAggregation.CommonFields.DOC_COUNT, 0);
+        return builder;
+    }
+
+}

+ 262 - 0
src/test/java/org/elasticsearch/search/aggregations/bucket/SamplerTests.java

@@ -0,0 +1,262 @@
+/*
+ * Licensed to Elasticsearch under one or more contributor
+ * license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright
+ * ownership. Elasticsearch licenses this file to you under
+ * the Apache License, Version 2.0 (the "License"); you may
+ * not use this file except in compliance with the License.
+ * You may obtain a copy of the License at
+ *
+ *    http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied.  See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+package org.elasticsearch.search.aggregations.bucket;
+
+import org.elasticsearch.action.admin.indices.refresh.RefreshRequest;
+import org.elasticsearch.action.search.SearchResponse;
+import org.elasticsearch.action.search.SearchType;
+import org.elasticsearch.index.query.TermQueryBuilder;
+import org.elasticsearch.search.aggregations.bucket.sampler.Sampler;
+import org.elasticsearch.search.aggregations.bucket.sampler.SamplerAggregationBuilder;
+import org.elasticsearch.search.aggregations.bucket.sampler.SamplerAggregator;
+import org.elasticsearch.search.aggregations.bucket.terms.Terms;
+import org.elasticsearch.search.aggregations.bucket.terms.Terms.Bucket;
+import org.elasticsearch.search.aggregations.bucket.terms.TermsBuilder;
+import org.elasticsearch.test.ElasticsearchIntegrationTest;
+import org.junit.Test;
+
+import java.util.Collection;
+
+import static org.elasticsearch.cluster.metadata.IndexMetaData.SETTING_NUMBER_OF_REPLICAS;
+import static org.elasticsearch.cluster.metadata.IndexMetaData.SETTING_NUMBER_OF_SHARDS;
+import static org.elasticsearch.test.hamcrest.ElasticsearchAssertions.assertAcked;
+import static org.elasticsearch.test.hamcrest.ElasticsearchAssertions.assertSearchResponse;
+import static org.hamcrest.Matchers.equalTo;
+import static org.hamcrest.Matchers.greaterThan;
+import static org.hamcrest.Matchers.lessThanOrEqualTo;
+
+/**
+ * Tests the Sampler aggregation
+ */
+@ElasticsearchIntegrationTest.SuiteScopeTest
+public class SamplerTests extends ElasticsearchIntegrationTest {
+
+    public static final int NUM_SHARDS = 2;
+
+    public String randomExecutionHint() {
+        return randomBoolean() ? null : randomFrom(SamplerAggregator.ExecutionMode.values()).toString();
+    }
+
+    
+    @Override
+    public void setupSuiteScopeCluster() throws Exception {
+        assertAcked(prepareCreate("test").setSettings(SETTING_NUMBER_OF_SHARDS, NUM_SHARDS, SETTING_NUMBER_OF_REPLICAS, 0).addMapping(
+                "book", "author", "type=string,index=not_analyzed", "name", "type=string,index=analyzed", "genre",
+                "type=string,index=not_analyzed"));
+        createIndex("idx_unmapped");
+        // idx_unmapped_author is same as main index but missing author field
+        assertAcked(prepareCreate("idx_unmapped_author").setSettings(SETTING_NUMBER_OF_SHARDS, NUM_SHARDS, SETTING_NUMBER_OF_REPLICAS, 0)
+                .addMapping("book", "name", "type=string,index=analyzed", "genre", "type=string,index=not_analyzed"));
+
+        ensureGreen();
+        String data[] = {                    
+                // "id,cat,name,price,inStock,author_t,series_t,sequence_i,genre_s",
+                "0553573403,book,A Game of Thrones,7.99,true,George R.R. Martin,A Song of Ice and Fire,1,fantasy",
+                "0553579908,book,A Clash of Kings,7.99,true,George R.R. Martin,A Song of Ice and Fire,2,fantasy",
+                "055357342X,book,A Storm of Swords,7.99,true,George R.R. Martin,A Song of Ice and Fire,3,fantasy",
+                "0553293354,book,Foundation,7.99,true,Isaac Asimov,Foundation Novels,1,scifi",
+                "0812521390,book,The Black Company,6.99,false,Glen Cook,The Chronicles of The Black Company,1,fantasy",
+                "0812550706,book,Ender's Game,6.99,true,Orson Scott Card,Ender,1,scifi",
+                "0441385532,book,Jhereg,7.95,false,Steven Brust,Vlad Taltos,1,fantasy",
+                "0380014300,book,Nine Princes In Amber,6.99,true,Roger Zelazny,the Chronicles of Amber,1,fantasy",
+                "0805080481,book,The Book of Three,5.99,true,Lloyd Alexander,The Chronicles of Prydain,1,fantasy",
+                "080508049X,book,The Black Cauldron,5.99,true,Lloyd Alexander,The Chronicles of Prydain,2,fantasy"
+
+            };
+            
+        for (int i = 0; i < data.length; i++) {
+            String[] parts = data[i].split(",");
+            client().prepareIndex("test", "book", "" + i).setSource("author", parts[5], "name", parts[2], "genre", parts[8]).get();
+            client().prepareIndex("idx_unmapped_author", "book", "" + i).setSource("name", parts[2], "genre", parts[8]).get();
+        }
+        client().admin().indices().refresh(new RefreshRequest("test")).get();
+    }
+
+    @Test
+    public void noDiversity() throws Exception {
+        SamplerAggregationBuilder sampleAgg = new SamplerAggregationBuilder("sample").shardSize(100);
+        sampleAgg.subAggregation(new TermsBuilder("authors").field("author"));
+        SearchResponse response = client().prepareSearch("test").setSearchType(SearchType.QUERY_AND_FETCH)
+                .setQuery(new TermQueryBuilder("genre", "fantasy")).setFrom(0).setSize(60).addAggregation(sampleAgg).execute().actionGet();
+        assertSearchResponse(response);
+        Sampler sample = response.getAggregations().get("sample");
+        Terms authors = sample.getAggregations().get("authors");
+        Collection<Bucket> testBuckets = authors.getBuckets();
+
+        long maxBooksPerAuthor = 0;
+        for (Terms.Bucket testBucket : testBuckets) {
+            maxBooksPerAuthor = Math.max(testBucket.getDocCount(), maxBooksPerAuthor);
+        }
+        assertThat(maxBooksPerAuthor, equalTo(3l));
+    }
+
+    @Test
+    public void simpleDiversity() throws Exception {
+        int MAX_DOCS_PER_AUTHOR = 1;
+        SamplerAggregationBuilder sampleAgg = new SamplerAggregationBuilder("sample").shardSize(100);
+        sampleAgg.field("author").maxDocsPerValue(MAX_DOCS_PER_AUTHOR).executionHint(randomExecutionHint());
+        sampleAgg.subAggregation(new TermsBuilder("authors").field("author"));
+        SearchResponse response = client().prepareSearch("test")
+                .setSearchType(SearchType.QUERY_AND_FETCH)
+                .setQuery(new TermQueryBuilder("genre", "fantasy"))
+                .setFrom(0).setSize(60)
+                .addAggregation(sampleAgg)
+                .execute()
+                .actionGet();
+        assertSearchResponse(response);
+        Sampler sample = response.getAggregations().get("sample");
+        Terms authors = sample.getAggregations().get("authors");
+        Collection<Bucket> testBuckets = authors.getBuckets();
+    
+        for (Terms.Bucket testBucket : testBuckets) {
+            assertThat(testBucket.getDocCount(), lessThanOrEqualTo((long) NUM_SHARDS * MAX_DOCS_PER_AUTHOR));
+        }        
+    }
+
+    @Test
+    public void nestedDiversity() throws Exception {
+        // Test multiple samples gathered under buckets made by a parent agg
+        int MAX_DOCS_PER_AUTHOR = 1;
+        TermsBuilder rootTerms = new TermsBuilder("genres").field("genre");
+
+        SamplerAggregationBuilder sampleAgg = new SamplerAggregationBuilder("sample").shardSize(100);
+        sampleAgg.field("author").maxDocsPerValue(MAX_DOCS_PER_AUTHOR).executionHint(randomExecutionHint());
+        sampleAgg.subAggregation(new TermsBuilder("authors").field("author"));
+
+        rootTerms.subAggregation(sampleAgg);
+        SearchResponse response = client().prepareSearch("test").setSearchType(SearchType.QUERY_AND_FETCH)
+                .addAggregation(rootTerms).execute().actionGet();
+        assertSearchResponse(response);
+        Terms genres = response.getAggregations().get("genres");
+        Collection<Bucket> genreBuckets = genres.getBuckets();
+        for (Terms.Bucket genreBucket : genreBuckets) {
+            Sampler sample = genreBucket.getAggregations().get("sample");
+            Terms authors = sample.getAggregations().get("authors");
+            Collection<Bucket> testBuckets = authors.getBuckets();
+
+            for (Terms.Bucket testBucket : testBuckets) {
+                assertThat(testBucket.getDocCount(), lessThanOrEqualTo((long) NUM_SHARDS * MAX_DOCS_PER_AUTHOR));
+            }
+        }
+    }
+
+    @Test
+    public void nestedSamples() throws Exception {
+        // Test samples nested under samples
+        int MAX_DOCS_PER_AUTHOR = 1;
+        int MAX_DOCS_PER_GENRE = 2;
+        SamplerAggregationBuilder rootSample = new SamplerAggregationBuilder("genreSample").shardSize(100).field("genre")
+                .maxDocsPerValue(MAX_DOCS_PER_GENRE);
+
+        SamplerAggregationBuilder sampleAgg = new SamplerAggregationBuilder("sample").shardSize(100);
+        sampleAgg.field("author").maxDocsPerValue(MAX_DOCS_PER_AUTHOR).executionHint(randomExecutionHint());
+        sampleAgg.subAggregation(new TermsBuilder("authors").field("author"));
+        sampleAgg.subAggregation(new TermsBuilder("genres").field("genre"));
+
+        rootSample.subAggregation(sampleAgg);
+        SearchResponse response = client().prepareSearch("test").setSearchType(SearchType.QUERY_AND_FETCH).addAggregation(rootSample)
+                .execute().actionGet();
+        assertSearchResponse(response);
+        Sampler genreSample = response.getAggregations().get("genreSample");
+        Sampler sample = genreSample.getAggregations().get("sample");
+
+        Terms genres = sample.getAggregations().get("genres");
+        Collection<Bucket> testBuckets = genres.getBuckets();
+        for (Terms.Bucket testBucket : testBuckets) {
+            assertThat(testBucket.getDocCount(), lessThanOrEqualTo((long) NUM_SHARDS * MAX_DOCS_PER_GENRE));
+        }
+
+        Terms authors = sample.getAggregations().get("authors");
+        testBuckets = authors.getBuckets();
+        for (Terms.Bucket testBucket : testBuckets) {
+            assertThat(testBucket.getDocCount(), lessThanOrEqualTo((long) NUM_SHARDS * MAX_DOCS_PER_AUTHOR));
+        }
+    }
+
+    @Test
+    public void unmappedChildAggNoDiversity() throws Exception {
+        SamplerAggregationBuilder sampleAgg = new SamplerAggregationBuilder("sample").shardSize(100);
+        sampleAgg.subAggregation(new TermsBuilder("authors").field("author"));
+        SearchResponse response = client().prepareSearch("idx_unmapped")
+                .setSearchType(SearchType.QUERY_AND_FETCH)
+                .setQuery(new TermQueryBuilder("genre", "fantasy"))
+                .setFrom(0).setSize(60)
+                .addAggregation(sampleAgg)
+                .execute()
+                .actionGet();
+        assertSearchResponse(response);
+        Sampler sample = response.getAggregations().get("sample");
+        assertThat(sample.getDocCount(), equalTo(0l));
+        Terms authors = sample.getAggregations().get("authors");
+        assertThat(authors.getBuckets().size(), equalTo(0));
+    }
+
+
+
+    @Test
+    public void partiallyUnmappedChildAggNoDiversity() throws Exception {
+        SamplerAggregationBuilder sampleAgg = new SamplerAggregationBuilder("sample").shardSize(100);
+        sampleAgg.subAggregation(new TermsBuilder("authors").field("author"));
+        SearchResponse response = client().prepareSearch("idx_unmapped", "test")
+                .setSearchType(SearchType.QUERY_AND_FETCH)
+                .setQuery(new TermQueryBuilder("genre", "fantasy"))
+                .setFrom(0).setSize(60).setExplain(true)
+                .addAggregation(sampleAgg)
+                .execute()
+                .actionGet();
+        assertSearchResponse(response);
+        Sampler sample = response.getAggregations().get("sample");
+        assertThat(sample.getDocCount(), greaterThan(0l));
+        Terms authors = sample.getAggregations().get("authors");
+        assertThat(authors.getBuckets().size(), greaterThan(0));
+    }
+
+    @Test
+    public void partiallyUnmappedDiversifyField() throws Exception {
+        // One of the indexes is missing the "author" field used for
+        // diversifying results
+        SamplerAggregationBuilder sampleAgg = new SamplerAggregationBuilder("sample").shardSize(100).field("author").maxDocsPerValue(1);
+        sampleAgg.subAggregation(new TermsBuilder("authors").field("author"));
+        SearchResponse response = client().prepareSearch("idx_unmapped_author", "test").setSearchType(SearchType.QUERY_AND_FETCH)
+                .setQuery(new TermQueryBuilder("genre", "fantasy")).setFrom(0).setSize(60).addAggregation(sampleAgg)
+                .execute().actionGet();
+        assertSearchResponse(response);
+        Sampler sample = response.getAggregations().get("sample");
+        assertThat(sample.getDocCount(), greaterThan(0l));
+        Terms authors = sample.getAggregations().get("authors");
+        assertThat(authors.getBuckets().size(), greaterThan(0));
+    }
+
+    @Test
+    public void whollyUnmappedDiversifyField() throws Exception {
+        //All of the indices are missing the "author" field used for diversifying results        
+        int MAX_DOCS_PER_AUTHOR = 1;
+        SamplerAggregationBuilder sampleAgg = new SamplerAggregationBuilder("sample").shardSize(100);
+        sampleAgg.field("author").maxDocsPerValue(MAX_DOCS_PER_AUTHOR).executionHint(randomExecutionHint());
+        sampleAgg.subAggregation(new TermsBuilder("authors").field("author"));
+        SearchResponse response = client().prepareSearch("idx_unmapped", "idx_unmapped_author").setSearchType(SearchType.QUERY_AND_FETCH)
+                .setQuery(new TermQueryBuilder("genre", "fantasy")).setFrom(0).setSize(60).addAggregation(sampleAgg).execute().actionGet();
+        assertSearchResponse(response);
+        Sampler sample = response.getAggregations().get("sample");
+        assertThat(sample.getDocCount(), equalTo(0l));
+        Terms authors = sample.getAggregations().get("authors");
+        assertNull(authors);
+    }
+
+}