Browse Source

Add missing_bucket option in the composite agg (#29465)

This change adds a new option to the composite aggregation named `missing_bucket`.
This option can be set by source and dictates whether documents without a value for the
source should be ignored. When set to true, documents without a value for a field emits
an explicit `null` value which is then added in the composite bucket.
The `missing` option that allows to set an explicit value (instead of `null`) is deprecated in this change and will be removed in a follow up (only in 7.x).
This commit also changes how the big arrays are allocated, instead of reserving
the provided `size` for all sources they are created with a small intial size and they grow
depending on the number of buckets created by the aggregation:
Closes #29380
Jim Ferenczi 7 years ago
parent
commit
e33d107f84
25 changed files with 890 additions and 224 deletions
  1. 28 0
      docs/reference/aggregations/bucket/composite-aggregation.asciidoc
  2. 29 0
      rest-api-spec/src/main/resources/rest-api-spec/test/search.aggregation/230_composite.yml
  3. 66 15
      server/src/main/java/org/elasticsearch/search/aggregations/bucket/composite/BinaryValuesSource.java
  4. 68 0
      server/src/main/java/org/elasticsearch/search/aggregations/bucket/composite/BitArray.java
  5. 1 6
      server/src/main/java/org/elasticsearch/search/aggregations/bucket/composite/CompositeAggregation.java
  6. 3 1
      server/src/main/java/org/elasticsearch/search/aggregations/bucket/composite/CompositeAggregationBuilder.java
  7. 85 83
      server/src/main/java/org/elasticsearch/search/aggregations/bucket/composite/CompositeAggregator.java
  8. 23 29
      server/src/main/java/org/elasticsearch/search/aggregations/bucket/composite/CompositeValuesCollectorQueue.java
  9. 47 5
      server/src/main/java/org/elasticsearch/search/aggregations/bucket/composite/CompositeValuesSourceBuilder.java
  10. 11 1
      server/src/main/java/org/elasticsearch/search/aggregations/bucket/composite/CompositeValuesSourceConfig.java
  11. 1 1
      server/src/main/java/org/elasticsearch/search/aggregations/bucket/composite/CompositeValuesSourceParserHelper.java
  12. 1 1
      server/src/main/java/org/elasticsearch/search/aggregations/bucket/composite/DateHistogramValuesSourceBuilder.java
  13. 50 7
      server/src/main/java/org/elasticsearch/search/aggregations/bucket/composite/DoubleValuesSource.java
  14. 19 10
      server/src/main/java/org/elasticsearch/search/aggregations/bucket/composite/GlobalOrdinalValuesSource.java
  15. 1 1
      server/src/main/java/org/elasticsearch/search/aggregations/bucket/composite/HistogramValuesSourceBuilder.java
  16. 15 4
      server/src/main/java/org/elasticsearch/search/aggregations/bucket/composite/InternalComposite.java
  17. 53 9
      server/src/main/java/org/elasticsearch/search/aggregations/bucket/composite/LongValuesSource.java
  18. 11 1
      server/src/main/java/org/elasticsearch/search/aggregations/bucket/composite/SingleDimensionValuesSource.java
  19. 2 1
      server/src/main/java/org/elasticsearch/search/aggregations/bucket/composite/TermsSortedDocsProducer.java
  20. 1 1
      server/src/main/java/org/elasticsearch/search/aggregations/bucket/composite/TermsValuesSourceBuilder.java
  21. 54 0
      server/src/test/java/org/elasticsearch/search/aggregations/bucket/composite/BitArrayTests.java
  22. 9 0
      server/src/test/java/org/elasticsearch/search/aggregations/bucket/composite/CompositeAggregationBuilderTests.java
  23. 168 3
      server/src/test/java/org/elasticsearch/search/aggregations/bucket/composite/CompositeAggregatorTests.java
  24. 76 44
      server/src/test/java/org/elasticsearch/search/aggregations/bucket/composite/CompositeValuesCollectorQueueTests.java
  25. 68 1
      server/src/test/java/org/elasticsearch/search/aggregations/bucket/composite/SingleDimensionValuesSourceTests.java

+ 28 - 0
docs/reference/aggregations/bucket/composite-aggregation.asciidoc

@@ -348,6 +348,34 @@ GET /_search
 \... will sort the composite bucket in descending order when comparing values from the `date_histogram` source
 and in ascending order when comparing values from the `terms` source.
 
+====== Missing bucket
+
+By default documents without a value for a given source are ignored.
+It is possible to include them in the response by setting `missing_bucket` to
+`true` (defaults to `false`):
+
+[source,js]
+--------------------------------------------------
+GET /_search
+{
+    "aggs" : {
+        "my_buckets": {
+            "composite" : {
+                "sources" : [
+                    { "product_name": { "terms" : { "field": "product", "missing_bucket": true } } }
+                ]
+            }
+        }
+     }
+}
+--------------------------------------------------
+// CONSOLE
+
+In the example above the source `product_name` will emit an explicit `null` value
+for documents without a value for the field `product`.
+The `order` specified in the source dictates whether the `null` values should rank
+first (ascending order, `asc`) or last (descending order, `desc`).
+
 ==== Size
 
 The `size` parameter can be set to define how many composite buckets should be returned.

+ 29 - 0
rest-api-spec/src/main/resources/rest-api-spec/test/search.aggregation/230_composite.yml

@@ -323,3 +323,32 @@ setup:
   - length: { aggregations.test.buckets: 2 }
   - length: { aggregations.test.after_key: 1 }
   - match: { aggregations.test.after_key.keyword:  "foo" }
+
+---
+"Composite aggregation and array size":
+  - skip:
+      version: " - 6.99.99"
+      reason:  starting in 7.0 the composite sources do not allocate arrays eagerly.
+
+  - do:
+        search:
+          index: test
+          body:
+            aggregations:
+              test:
+                composite:
+                  size: 1000000000
+                  sources: [
+                    {
+                      "keyword": {
+                        "terms": {
+                          "field": "keyword",
+                        }
+                      }
+                    }
+                  ]
+
+  - match: {hits.total: 6}
+  - length: { aggregations.test.buckets: 2 }
+  - length: { aggregations.test.after_key: 1 }
+  - match: { aggregations.test.after_key.keyword:  "foo" }

+ 66 - 15
server/src/main/java/org/elasticsearch/search/aggregations/bucket/composite/BinaryValuesSource.java

@@ -24,49 +24,93 @@ import org.apache.lucene.index.LeafReaderContext;
 import org.apache.lucene.search.MatchAllDocsQuery;
 import org.apache.lucene.search.Query;
 import org.apache.lucene.util.BytesRef;
+import org.apache.lucene.util.BytesRefBuilder;
 import org.elasticsearch.common.CheckedFunction;
+import org.elasticsearch.common.lease.Releasables;
+import org.elasticsearch.common.util.BigArrays;
+import org.elasticsearch.common.util.ObjectArray;
 import org.elasticsearch.index.fielddata.SortedBinaryDocValues;
-import org.elasticsearch.index.mapper.KeywordFieldMapper;
 import org.elasticsearch.index.mapper.MappedFieldType;
 import org.elasticsearch.index.mapper.StringFieldType;
-import org.elasticsearch.index.mapper.TextFieldMapper;
 import org.elasticsearch.search.DocValueFormat;
 import org.elasticsearch.search.aggregations.LeafBucketCollector;
 
 import java.io.IOException;
+import java.util.function.LongConsumer;
 
 /**
  * A {@link SingleDimensionValuesSource} for binary source ({@link BytesRef}).
  */
 class BinaryValuesSource extends SingleDimensionValuesSource<BytesRef> {
+    private final LongConsumer breakerConsumer;
     private final CheckedFunction<LeafReaderContext, SortedBinaryDocValues, IOException> docValuesFunc;
-    private final BytesRef[] values;
+    private ObjectArray<BytesRef> values;
+    private ObjectArray<BytesRefBuilder> valueBuilders;
     private BytesRef currentValue;
 
-    BinaryValuesSource(MappedFieldType fieldType, CheckedFunction<LeafReaderContext, SortedBinaryDocValues, IOException> docValuesFunc,
-                       DocValueFormat format, Object missing, int size, int reverseMul) {
-        super(format, fieldType, missing, size, reverseMul);
+    BinaryValuesSource(BigArrays bigArrays, LongConsumer breakerConsumer,
+                       MappedFieldType fieldType, CheckedFunction<LeafReaderContext, SortedBinaryDocValues, IOException> docValuesFunc,
+                       DocValueFormat format, boolean missingBucket, Object missing, int size, int reverseMul) {
+        super(bigArrays, format, fieldType, missingBucket, missing, size, reverseMul);
+        this.breakerConsumer = breakerConsumer;
         this.docValuesFunc = docValuesFunc;
-        this.values = new BytesRef[size];
+        this.values = bigArrays.newObjectArray(Math.min(size, 100));
+        this.valueBuilders = bigArrays.newObjectArray(Math.min(size, 100));
     }
 
     @Override
-    public void copyCurrent(int slot) {
-        values[slot] = BytesRef.deepCopyOf(currentValue);
+    void copyCurrent(int slot) {
+        values =  bigArrays.grow(values, slot+1);
+        valueBuilders = bigArrays.grow(valueBuilders, slot+1);
+        BytesRefBuilder builder = valueBuilders.get(slot);
+        int byteSize = builder == null ? 0 : builder.bytes().length;
+        if (builder == null) {
+            builder = new BytesRefBuilder();
+            valueBuilders.set(slot, builder);
+        }
+        if (missingBucket && currentValue == null) {
+            values.set(slot, null);
+        } else {
+            assert currentValue != null;
+            builder.copyBytes(currentValue);
+            breakerConsumer.accept(builder.bytes().length - byteSize);
+            values.set(slot, builder.get());
+        }
     }
 
     @Override
-    public int compare(int from, int to) {
-        return compareValues(values[from], values[to]);
+    int compare(int from, int to) {
+        if (missingBucket) {
+            if (values.get(from) == null) {
+                return values.get(to) == null ? 0 : -1 * reverseMul;
+            } else if (values.get(to) == null) {
+                return reverseMul;
+            }
+        }
+        return compareValues(values.get(from), values.get(to));
     }
 
     @Override
     int compareCurrent(int slot) {
-        return compareValues(currentValue, values[slot]);
+        if (missingBucket) {
+            if (currentValue == null) {
+                return values.get(slot) == null ? 0 : -1 * reverseMul;
+            } else if (values.get(slot) == null) {
+                return reverseMul;
+            }
+        }
+        return compareValues(currentValue, values.get(slot));
     }
 
     @Override
     int compareCurrentWithAfter() {
+        if (missingBucket) {
+            if (currentValue == null) {
+                return afterValue == null ? 0 : -1 * reverseMul;
+            } else if (afterValue == null) {
+                return reverseMul;
+            }
+        }
         return compareValues(currentValue, afterValue);
     }
 
@@ -76,7 +120,9 @@ class BinaryValuesSource extends SingleDimensionValuesSource<BytesRef> {
 
     @Override
     void setAfter(Comparable<?> value) {
-        if (value.getClass() == String.class) {
+        if (missingBucket && value == null) {
+            afterValue = null;
+        } else if (value.getClass() == String.class) {
             afterValue = format.parseBytesRef(value.toString());
         } else {
             throw new IllegalArgumentException("invalid value, expected string, got " + value.getClass().getSimpleName());
@@ -85,7 +131,7 @@ class BinaryValuesSource extends SingleDimensionValuesSource<BytesRef> {
 
     @Override
     BytesRef toComparable(int slot) {
-        return values[slot];
+       return values.get(slot);
     }
 
     @Override
@@ -100,6 +146,9 @@ class BinaryValuesSource extends SingleDimensionValuesSource<BytesRef> {
                         currentValue = dvs.nextValue();
                         next.collect(doc, bucket);
                     }
+                } else if (missingBucket) {
+                    currentValue = null;
+                    next.collect(doc, bucket);
                 }
             }
         };
@@ -130,5 +179,7 @@ class BinaryValuesSource extends SingleDimensionValuesSource<BytesRef> {
     }
 
     @Override
-    public void close() {}
+    public void close() {
+        Releasables.close(values, valueBuilders);
+    }
 }

+ 68 - 0
server/src/main/java/org/elasticsearch/search/aggregations/bucket/composite/BitArray.java

@@ -0,0 +1,68 @@
+/*
+ * Licensed to Elasticsearch under one or more contributor
+ * license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright
+ * ownership. Elasticsearch licenses this file to you under
+ * the Apache License, Version 2.0 (the "License"); you may
+ * not use this file except in compliance with the License.
+ * You may obtain a copy of the License at
+ *
+ *    http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied.  See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+
+package org.elasticsearch.search.aggregations.bucket.composite;
+
+import org.elasticsearch.common.lease.Releasable;
+import org.elasticsearch.common.lease.Releasables;
+import org.elasticsearch.common.util.BigArrays;
+import org.elasticsearch.common.util.LongArray;
+
+/**
+ * A bit array that is implemented using a growing {@link LongArray}
+ * created from {@link BigArrays}.
+ * The underlying long array grows lazily based on the biggest index
+ * that needs to be set.
+ */
+final class BitArray implements Releasable {
+    private final BigArrays bigArrays;
+    private LongArray bits;
+
+    BitArray(BigArrays bigArrays, int initialSize) {
+        this.bigArrays = bigArrays;
+        this.bits = bigArrays.newLongArray(initialSize, true);
+    }
+
+    public void set(int index) {
+        fill(index, true);
+    }
+
+    public void clear(int index) {
+        fill(index, false);
+    }
+
+    public boolean get(int index) {
+        int wordNum = index >> 6;
+        long bitmask = 1L << index;
+        return (bits.get(wordNum) & bitmask) != 0;
+    }
+
+    private void fill(int index, boolean bit) {
+        int wordNum = index >> 6;
+        bits = bigArrays.grow(bits,wordNum+1);
+        long bitmask = 1L << index;
+        long value = bit ? bits.get(wordNum) | bitmask : bits.get(wordNum) & ~bitmask;
+        bits.set(wordNum, value);
+    }
+
+    @Override
+    public void close() {
+        Releasables.close(bits);
+    }
+}

+ 1 - 6
server/src/main/java/org/elasticsearch/search/aggregations/bucket/composite/CompositeAggregation.java

@@ -19,7 +19,6 @@
 
 package org.elasticsearch.search.aggregations.bucket.composite;
 
-import org.apache.lucene.util.BytesRef;
 import org.elasticsearch.common.xcontent.XContentBuilder;
 import org.elasticsearch.search.aggregations.bucket.MultiBucketsAggregation;
 
@@ -66,11 +65,7 @@ public interface CompositeAggregation extends MultiBucketsAggregation {
     static void buildCompositeMap(String fieldName, Map<String, Object> composite, XContentBuilder builder) throws IOException {
         builder.startObject(fieldName);
         for (Map.Entry<String, Object> entry : composite.entrySet()) {
-            if (entry.getValue().getClass() == BytesRef.class) {
-                builder.field(entry.getKey(), ((BytesRef) entry.getValue()).utf8ToString());
-            } else {
-                builder.field(entry.getKey(), entry.getValue());
-            }
+            builder.field(entry.getKey(), entry.getValue());
         }
         builder.endObject();
     }

+ 3 - 1
server/src/main/java/org/elasticsearch/search/aggregations/bucket/composite/CompositeAggregationBuilder.java

@@ -170,7 +170,9 @@ public class CompositeAggregationBuilder extends AbstractAggregationBuilder<Comp
                     throw new IllegalArgumentException("Missing value for [after." + sources.get(i).name() + "]");
                 }
                 Object obj = after.get(sourceName);
-                if (obj instanceof Comparable) {
+                if (configs[i].missingBucket() && obj == null) {
+                    values[i] = null;
+                } else if (obj instanceof Comparable) {
                     values[i] = (Comparable<?>) obj;
                 } else {
                     throw new IllegalArgumentException("Invalid value for [after." + sources.get(i).name() +

+ 85 - 83
server/src/main/java/org/elasticsearch/search/aggregations/bucket/composite/CompositeAggregator.java

@@ -30,6 +30,7 @@ import org.apache.lucene.search.Query;
 import org.apache.lucene.search.Scorer;
 import org.apache.lucene.search.Weight;
 import org.apache.lucene.util.RoaringDocIdSet;
+import org.elasticsearch.common.lease.Releasable;
 import org.elasticsearch.common.lease.Releasables;
 import org.elasticsearch.common.util.BigArrays;
 import org.elasticsearch.search.DocValueFormat;
@@ -50,6 +51,7 @@ import java.util.Arrays;
 import java.util.Collections;
 import java.util.List;
 import java.util.Map;
+import java.util.function.LongUnaryOperator;
 import java.util.stream.Collectors;
 
 final class CompositeAggregator extends BucketsAggregator {
@@ -59,9 +61,10 @@ final class CompositeAggregator extends BucketsAggregator {
     private final int[] reverseMuls;
     private final List<DocValueFormat> formats;
 
+    private final SingleDimensionValuesSource<?>[] sources;
     private final CompositeValuesCollectorQueue queue;
 
-    private final List<Entry> entries;
+    private final List<Entry> entries = new ArrayList<>();
     private LeafReaderContext currentLeaf;
     private RoaringDocIdSet.Builder docIdSetBuilder;
     private BucketCollector deferredCollectors;
@@ -74,19 +77,19 @@ final class CompositeAggregator extends BucketsAggregator {
         this.sourceNames = Arrays.stream(sourceConfigs).map(CompositeValuesSourceConfig::name).collect(Collectors.toList());
         this.reverseMuls = Arrays.stream(sourceConfigs).mapToInt(CompositeValuesSourceConfig::reverseMul).toArray();
         this.formats = Arrays.stream(sourceConfigs).map(CompositeValuesSourceConfig::format).collect(Collectors.toList());
-        final SingleDimensionValuesSource<?>[] sources =
-            createValuesSources(context.bigArrays(), context.searcher().getIndexReader(), context.query(), sourceConfigs, size);
-        this.queue = new CompositeValuesCollectorQueue(sources, size);
-        this.sortedDocsProducer = sources[0].createSortedDocsProducerOrNull(context.searcher().getIndexReader(), context.query());
-        if (rawAfterKey != null) {
-            queue.setAfter(rawAfterKey.values());
+        this.sources = new SingleDimensionValuesSource[sourceConfigs.length];
+        for (int i = 0; i < sourceConfigs.length; i++) {
+            this.sources[i] = createValuesSource(context.bigArrays(), context.searcher().getIndexReader(),
+                context.query(), sourceConfigs[i], size, i);
         }
-        this.entries = new ArrayList<>();
+        this.queue = new CompositeValuesCollectorQueue(context.bigArrays(), sources, size, rawAfterKey);
+        this.sortedDocsProducer = sources[0].createSortedDocsProducerOrNull(context.searcher().getIndexReader(), context.query());
     }
 
     @Override
     protected void doClose() {
         Releasables.close(queue);
+        Releasables.close(sources);
     }
 
     @Override
@@ -256,94 +259,93 @@ final class CompositeAggregator extends BucketsAggregator {
         };
     }
 
-    private static SingleDimensionValuesSource<?>[] createValuesSources(BigArrays bigArrays, IndexReader reader, Query query,
-                                                                        CompositeValuesSourceConfig[] configs, int size) {
-        final SingleDimensionValuesSource<?>[] sources = new SingleDimensionValuesSource[configs.length];
-        for (int i = 0; i < sources.length; i++) {
-            final int reverseMul = configs[i].reverseMul();
-            if (configs[i].valuesSource() instanceof ValuesSource.Bytes.WithOrdinals && reader instanceof DirectoryReader) {
-                ValuesSource.Bytes.WithOrdinals vs = (ValuesSource.Bytes.WithOrdinals) configs[i].valuesSource();
-                sources[i] = new GlobalOrdinalValuesSource(
+    private SingleDimensionValuesSource<?> createValuesSource(BigArrays bigArrays, IndexReader reader, Query query,
+                                                              CompositeValuesSourceConfig config, int sortRank, int size) {
+
+        final int reverseMul = config.reverseMul();
+        if (config.valuesSource() instanceof ValuesSource.Bytes.WithOrdinals && reader instanceof DirectoryReader) {
+            ValuesSource.Bytes.WithOrdinals vs = (ValuesSource.Bytes.WithOrdinals) config.valuesSource();
+            SingleDimensionValuesSource<?> source = new GlobalOrdinalValuesSource(
+                bigArrays,
+                config.fieldType(),
+                vs::globalOrdinalsValues,
+                config.format(),
+                config.missingBucket(),
+                config.missing(),
+                size,
+                reverseMul
+            );
+
+            if (sortRank == 0 && source.createSortedDocsProducerOrNull(reader, query) != null) {
+                // this the leading source and we can optimize it with the sorted docs producer but
+                // we don't want to use global ordinals because the number of visited documents
+                // should be low and global ordinals need one lookup per visited term.
+                Releasables.close(source);
+                return new BinaryValuesSource(
                     bigArrays,
-                    configs[i].fieldType(),
-                    vs::globalOrdinalsValues,
-                    configs[i].format(),
-                    configs[i].missing(),
+                    this::addRequestCircuitBreakerBytes,
+                    config.fieldType(),
+                    vs::bytesValues,
+                    config.format(),
+                    config.missingBucket(),
+                    config.missing(),
                     size,
                     reverseMul
                 );
+            } else {
+                return source;
+            }
+        } else if (config.valuesSource() instanceof ValuesSource.Bytes) {
+            ValuesSource.Bytes vs = (ValuesSource.Bytes) config.valuesSource();
+            return new BinaryValuesSource(
+                bigArrays,
+                this::addRequestCircuitBreakerBytes,
+                config.fieldType(),
+                vs::bytesValues,
+                config.format(),
+                config.missingBucket(),
+                config.missing(),
+                size,
+                reverseMul
+            );
 
-                if (i == 0 && sources[i].createSortedDocsProducerOrNull(reader, query) != null) {
-                    // this the leading source and we can optimize it with the sorted docs producer but
-                    // we don't want to use global ordinals because the number of visited documents
-                    // should be low and global ordinals need one lookup per visited term.
-                    Releasables.close(sources[i]);
-                    sources[i] = new BinaryValuesSource(
-                        configs[i].fieldType(),
-                        vs::bytesValues,
-                        configs[i].format(),
-                        configs[i].missing(),
-                        size,
-                        reverseMul
-                    );
-                }
-            } else if (configs[i].valuesSource() instanceof ValuesSource.Bytes) {
-                ValuesSource.Bytes vs = (ValuesSource.Bytes) configs[i].valuesSource();
-                sources[i] = new BinaryValuesSource(
-                    configs[i].fieldType(),
-                    vs::bytesValues,
-                    configs[i].format(),
-                    configs[i].missing(),
+        } else if (config.valuesSource() instanceof ValuesSource.Numeric) {
+            final ValuesSource.Numeric vs = (ValuesSource.Numeric) config.valuesSource();
+            if (vs.isFloatingPoint()) {
+                return new DoubleValuesSource(
+                    bigArrays,
+                    config.fieldType(),
+                    vs::doubleValues,
+                    config.format(),
+                    config.missingBucket(),
+                    config.missing(),
                     size,
                     reverseMul
                 );
 
-            } else if (configs[i].valuesSource() instanceof ValuesSource.Numeric) {
-                final ValuesSource.Numeric vs = (ValuesSource.Numeric) configs[i].valuesSource();
-                if (vs.isFloatingPoint()) {
-                    sources[i] = new DoubleValuesSource(
-                        bigArrays,
-                        configs[i].fieldType(),
-                        vs::doubleValues,
-                        configs[i].format(),
-                        configs[i].missing(),
-                        size,
-                        reverseMul
-                    );
-
+            } else {
+                final LongUnaryOperator rounding;
+                if (vs instanceof RoundingValuesSource) {
+                    rounding = ((RoundingValuesSource) vs)::round;
                 } else {
-                    if (vs instanceof RoundingValuesSource) {
-                        sources[i] = new LongValuesSource(
-                            bigArrays,
-                            configs[i].fieldType(),
-                            vs::longValues,
-                            ((RoundingValuesSource) vs)::round,
-                            configs[i].format(),
-                            configs[i].missing(),
-                            size,
-                            reverseMul
-                        );
-
-                    } else {
-                        sources[i] = new LongValuesSource(
-                            bigArrays,
-                            configs[i].fieldType(),
-                            vs::longValues,
-                            (value) -> value,
-                            configs[i].format(),
-                            configs[i].missing(),
-                            size,
-                            reverseMul
-                        );
-
-                    }
+                    rounding = LongUnaryOperator.identity();
                 }
-            } else {
-                throw new IllegalArgumentException("Unknown value source: " + configs[i].valuesSource().getClass().getName() +
-                    " for field: " + sources[i].fieldType.name());
+                return new LongValuesSource(
+                    bigArrays,
+                    config.fieldType(),
+                    vs::longValues,
+                    rounding,
+                    config.format(),
+                    config.missingBucket(),
+                    config.missing(),
+                    size,
+                    reverseMul
+                );
             }
+        } else {
+            throw new IllegalArgumentException("Unknown values source type: " + config.valuesSource().getClass().getName() +
+                " for source: " + config.name());
         }
-        return sources;
     }
 
     private static class Entry {

+ 23 - 29
server/src/main/java/org/elasticsearch/search/aggregations/bucket/composite/CompositeValuesCollectorQueue.java

@@ -22,10 +22,11 @@ package org.elasticsearch.search.aggregations.bucket.composite;
 import org.apache.lucene.index.LeafReaderContext;
 import org.elasticsearch.common.lease.Releasable;
 import org.elasticsearch.common.lease.Releasables;
+import org.elasticsearch.common.util.BigArrays;
+import org.elasticsearch.common.util.IntArray;
 import org.elasticsearch.search.aggregations.LeafBucketCollector;
 
 import java.io.IOException;
-import java.util.Arrays;
 import java.util.Set;
 import java.util.TreeMap;
 
@@ -36,29 +37,33 @@ final class CompositeValuesCollectorQueue implements Releasable {
     // the slot for the current candidate
     private static final int CANDIDATE_SLOT = Integer.MAX_VALUE;
 
+    private final BigArrays bigArrays;
     private final int maxSize;
     private final TreeMap<Integer, Integer> keys;
     private final SingleDimensionValuesSource<?>[] arrays;
-    private final int[] docCounts;
-    private boolean afterValueSet = false;
+    private IntArray docCounts;
+    private boolean afterKeyIsSet = false;
 
     /**
      * Constructs a composite queue with the specified size and sources.
      *
      * @param sources The list of {@link CompositeValuesSourceConfig} to build the composite buckets.
      * @param size The number of composite buckets to keep.
+     * @param afterKey
      */
-    CompositeValuesCollectorQueue(SingleDimensionValuesSource<?>[] sources, int size) {
+    CompositeValuesCollectorQueue(BigArrays bigArrays, SingleDimensionValuesSource<?>[] sources, int size, CompositeKey afterKey) {
+        this.bigArrays = bigArrays;
         this.maxSize = size;
         this.arrays = sources;
-        this.docCounts = new int[size];
         this.keys = new TreeMap<>(this::compare);
-    }
-
-    void clear() {
-        keys.clear();
-        Arrays.fill(docCounts, 0);
-        afterValueSet = false;
+        if (afterKey != null) {
+            assert afterKey.size() == sources.length;
+            afterKeyIsSet = true;
+            for (int i = 0; i < afterKey.size(); i++) {
+                sources[i].setAfter(afterKey.get(i));
+            }
+        }
+        this.docCounts = bigArrays.newIntArray(1, false);
     }
 
     /**
@@ -94,7 +99,7 @@ final class CompositeValuesCollectorQueue implements Releasable {
      * Returns the lowest value (exclusive) of the leading source.
      */
     Comparable<?> getLowerValueLeadSource() {
-        return afterValueSet ? arrays[0].getAfter() : null;
+        return afterKeyIsSet ? arrays[0].getAfter() : null;
     }
 
     /**
@@ -107,7 +112,7 @@ final class CompositeValuesCollectorQueue implements Releasable {
      * Returns the document count in <code>slot</code>.
      */
     int getDocCount(int slot) {
-        return docCounts[slot];
+        return docCounts.get(slot);
     }
 
     /**
@@ -117,7 +122,8 @@ final class CompositeValuesCollectorQueue implements Releasable {
         for (int i = 0; i < arrays.length; i++) {
             arrays[i].copyCurrent(slot);
         }
-        docCounts[slot] = 1;
+        docCounts = bigArrays.grow(docCounts, slot+1);
+        docCounts.set(slot, 1);
     }
 
     /**
@@ -134,17 +140,6 @@ final class CompositeValuesCollectorQueue implements Releasable {
         return 0;
     }
 
-    /**
-     * Sets the after values for this comparator.
-     */
-    void setAfter(Comparable<?>[] values) {
-        assert values.length == arrays.length;
-        afterValueSet = true;
-        for (int i = 0; i < arrays.length; i++) {
-            arrays[i].setAfter(values[i]);
-        }
-    }
-
     /**
      * Compares the after values with the values in <code>slot</code>.
      */
@@ -207,10 +202,10 @@ final class CompositeValuesCollectorQueue implements Releasable {
         Integer topSlot = compareCurrent();
         if (topSlot != null) {
             // this key is already in the top N, skip it
-            docCounts[topSlot] += 1;
+            docCounts.increment(topSlot, 1);
             return topSlot;
         }
-        if (afterValueSet && compareCurrentWithAfter() <= 0) {
+        if (afterKeyIsSet && compareCurrentWithAfter() <= 0) {
             // this key is greater than the top value collected in the previous round, skip it
             return -1;
         }
@@ -239,9 +234,8 @@ final class CompositeValuesCollectorQueue implements Releasable {
         return newSlot;
     }
 
-
     @Override
     public void close() {
-        Releasables.close(arrays);
+        Releasables.close(docCounts);
     }
 }

+ 47 - 5
server/src/main/java/org/elasticsearch/search/aggregations/bucket/composite/CompositeValuesSourceBuilder.java

@@ -23,6 +23,8 @@ import org.elasticsearch.Version;
 import org.elasticsearch.common.io.stream.StreamInput;
 import org.elasticsearch.common.io.stream.StreamOutput;
 import org.elasticsearch.common.io.stream.Writeable;
+import org.elasticsearch.common.logging.DeprecationLogger;
+import org.elasticsearch.common.logging.Loggers;
 import org.elasticsearch.common.xcontent.ToXContentFragment;
 import org.elasticsearch.common.xcontent.XContentBuilder;
 import org.elasticsearch.index.query.QueryShardException;
@@ -40,10 +42,14 @@ import java.util.Objects;
  * A {@link ValuesSource} builder for {@link CompositeAggregationBuilder}
  */
 public abstract class CompositeValuesSourceBuilder<AB extends CompositeValuesSourceBuilder<AB>> implements Writeable, ToXContentFragment {
+    private static final DeprecationLogger DEPRECATION_LOGGER =
+        new DeprecationLogger(Loggers.getLogger(CompositeValuesSourceBuilder.class));
+
     protected final String name;
     private String field = null;
     private Script script = null;
     private ValueType valueType = null;
+    private boolean missingBucket = false;
     private Object missing = null;
     private SortOrder order = SortOrder.ASC;
     private String format = null;
@@ -66,6 +72,11 @@ public abstract class CompositeValuesSourceBuilder<AB extends CompositeValuesSou
         if (in.readBoolean()) {
             this.valueType = ValueType.readFromStream(in);
         }
+        if (in.getVersion().onOrAfter(Version.V_7_0_0_alpha1)) {
+            this.missingBucket = in.readBoolean();
+        } else {
+            this.missingBucket = false;
+        }
         this.missing = in.readGenericValue();
         this.order = SortOrder.readFromStream(in);
         if (in.getVersion().onOrAfter(Version.V_6_3_0)) {
@@ -89,6 +100,9 @@ public abstract class CompositeValuesSourceBuilder<AB extends CompositeValuesSou
         if (hasValueType) {
             valueType.writeTo(out);
         }
+        if (out.getVersion().onOrAfter(Version.V_7_0_0_alpha1)) {
+            out.writeBoolean(missingBucket);
+        }
         out.writeGenericValue(missing);
         order.writeTo(out);
         if (out.getVersion().onOrAfter(Version.V_6_3_0)) {
@@ -110,6 +124,7 @@ public abstract class CompositeValuesSourceBuilder<AB extends CompositeValuesSou
         if (script != null) {
             builder.field("script", script);
         }
+        builder.field("missing_bucket", missingBucket);
         if (missing != null) {
             builder.field("missing", missing);
         }
@@ -127,7 +142,7 @@ public abstract class CompositeValuesSourceBuilder<AB extends CompositeValuesSou
 
     @Override
     public final int hashCode() {
-        return Objects.hash(field, missing, script, valueType, order, format, innerHashCode());
+        return Objects.hash(field, missingBucket, missing, script, valueType, order, format, innerHashCode());
     }
 
     protected abstract int innerHashCode();
@@ -142,6 +157,7 @@ public abstract class CompositeValuesSourceBuilder<AB extends CompositeValuesSou
         return Objects.equals(field, that.field()) &&
             Objects.equals(script, that.script()) &&
             Objects.equals(valueType, that.valueType()) &&
+            Objects.equals(missingBucket, that.missingBucket()) &&
             Objects.equals(missing, that.missing()) &&
             Objects.equals(order, that.order()) &&
             Objects.equals(format, that.format()) &&
@@ -215,21 +231,43 @@ public abstract class CompositeValuesSourceBuilder<AB extends CompositeValuesSou
 
     /**
      * Sets the value to use when the source finds a missing value in a
-     * document
+     * document.
+     *
+     * @deprecated Use {@link #missingBucket(boolean)} instead.
      */
     @SuppressWarnings("unchecked")
+    @Deprecated
     public AB missing(Object missing) {
         if (missing == null) {
             throw new IllegalArgumentException("[missing] must not be null");
         }
+        DEPRECATION_LOGGER.deprecated("[missing] is deprecated. Please use [missing_bucket] instead.");
         this.missing = missing;
         return (AB) this;
     }
 
+    @Deprecated
     public Object missing() {
         return missing;
     }
 
+    /**
+     * If true an explicit `null bucket will represent documents with missing values.
+     */
+    @SuppressWarnings("unchecked")
+    public AB missingBucket(boolean missingBucket) {
+        this.missingBucket = missingBucket;
+        return (AB) this;
+    }
+
+    /**
+     * False if documents with missing values are ignored, otherwise missing values are
+     * represented by an explicit `null` value.
+     */
+    public boolean missingBucket() {
+        return missingBucket;
+    }
+
     /**
      * Sets the {@link SortOrder} to use to sort values produced this source
      */
@@ -292,11 +330,15 @@ public abstract class CompositeValuesSourceBuilder<AB extends CompositeValuesSou
         ValuesSourceConfig<?> config = ValuesSourceConfig.resolve(context.getQueryShardContext(),
             valueType, field, script, missing, null, format);
 
-        if (config.unmapped() && field != null && config.missing() == null) {
+        if (config.unmapped() && field != null && missing == null && missingBucket == false) {
             // this source cannot produce any values so we refuse to build
-            // since composite buckets are not created on null values
+            // since composite buckets are not created on null values by default.
+            throw new QueryShardException(context.getQueryShardContext(),
+                "failed to find field [" + field + "] and [missing_bucket] is not set");
+        }
+        if (missingBucket && missing != null) {
             throw new QueryShardException(context.getQueryShardContext(),
-                "failed to find field [" + field + "] and [missing] is not provided");
+                "cannot use [missing] option in conjunction with [missing_bucket]");
         }
         return innerBuild(context, config);
     }

+ 11 - 1
server/src/main/java/org/elasticsearch/search/aggregations/bucket/composite/CompositeValuesSourceConfig.java

@@ -33,6 +33,7 @@ class CompositeValuesSourceConfig {
     private final DocValueFormat format;
     private final int reverseMul;
     private final Object missing;
+    private final boolean missingBucket;
 
     /**
      * Creates a new {@link CompositeValuesSourceConfig}.
@@ -44,12 +45,14 @@ class CompositeValuesSourceConfig {
      * @param missing The missing value or null if documents with missing value should be ignored.
      */
     CompositeValuesSourceConfig(String name, @Nullable MappedFieldType fieldType, ValuesSource vs, DocValueFormat format,
-                                SortOrder order, @Nullable Object missing) {
+                                SortOrder order, boolean missingBucket, @Nullable Object missing) {
         this.name = name;
         this.fieldType = fieldType;
         this.vs = vs;
         this.format = format;
         this.reverseMul = order == SortOrder.ASC ? 1 : -1;
+        this.missingBucket = missingBucket;
+        assert missingBucket == false || missing == null;
         this.missing = missing;
     }
 
@@ -89,6 +92,13 @@ class CompositeValuesSourceConfig {
         return missing;
     }
 
+    /**
+     * If true, an explicit `null bucket represents documents with missing values.
+     */
+    boolean missingBucket() {
+        return missingBucket;
+    }
+
     /**
      * The sort order for the values source (e.g. -1 for descending and 1 for ascending).
      */

+ 1 - 1
server/src/main/java/org/elasticsearch/search/aggregations/bucket/composite/CompositeValuesSourceParserHelper.java

@@ -38,9 +38,9 @@ class CompositeValuesSourceParserHelper {
                                                                                            ValueType targetValueType) {
         objectParser.declareField(VB::field, XContentParser::text,
             new ParseField("field"), ObjectParser.ValueType.STRING);
-
         objectParser.declareField(VB::missing, XContentParser::objectText,
             new ParseField("missing"), ObjectParser.ValueType.VALUE);
+        objectParser.declareBoolean(VB::missingBucket, new ParseField("missing_bucket"));
 
         objectParser.declareField(VB::valueType, p -> {
             ValueType valueType = ValueType.resolveForScript(p.text());

+ 1 - 1
server/src/main/java/org/elasticsearch/search/aggregations/bucket/composite/DateHistogramValuesSourceBuilder.java

@@ -226,7 +226,7 @@ public class DateHistogramValuesSourceBuilder extends CompositeValuesSourceBuild
             // is specified in the builder.
             final DocValueFormat docValueFormat = format() == null ? DocValueFormat.RAW : config.format();
             final MappedFieldType fieldType = config.fieldContext() != null ? config.fieldContext().fieldType() : null;
-            return new CompositeValuesSourceConfig(name, fieldType, vs, docValueFormat, order(), missing());
+            return new CompositeValuesSourceConfig(name, fieldType, vs, docValueFormat, order(), missingBucket(), missing());
         } else {
             throw new IllegalArgumentException("invalid source, expected numeric, got " + orig.getClass().getSimpleName());
         }

+ 50 - 7
server/src/main/java/org/elasticsearch/search/aggregations/bucket/composite/DoubleValuesSource.java

@@ -38,34 +38,67 @@ import java.io.IOException;
  */
 class DoubleValuesSource extends SingleDimensionValuesSource<Double> {
     private final CheckedFunction<LeafReaderContext, SortedNumericDoubleValues, IOException> docValuesFunc;
-    private final DoubleArray values;
+    private final BitArray bits;
+    private DoubleArray values;
     private double currentValue;
+    private boolean missingCurrentValue;
 
     DoubleValuesSource(BigArrays bigArrays, MappedFieldType fieldType,
                        CheckedFunction<LeafReaderContext, SortedNumericDoubleValues, IOException> docValuesFunc,
-                       DocValueFormat format, Object missing, int size, int reverseMul) {
-        super(format, fieldType, missing, size, reverseMul);
+                       DocValueFormat format, boolean missingBucket, Object missing, int size, int reverseMul) {
+        super(bigArrays, format, fieldType, missingBucket, missing, size, reverseMul);
         this.docValuesFunc = docValuesFunc;
-        this.values = bigArrays.newDoubleArray(size, false);
+        this.bits = missingBucket ? new BitArray(bigArrays, 100) : null;
+        this.values = bigArrays.newDoubleArray(Math.min(size, 100), false);
     }
 
     @Override
     void copyCurrent(int slot) {
-        values.set(slot, currentValue);
+        values = bigArrays.grow(values, slot+1);
+        if (missingBucket && missingCurrentValue) {
+            bits.clear(slot);
+        } else {
+            assert missingCurrentValue == false;
+            if (missingBucket) {
+                bits.set(slot);
+            }
+            values.set(slot, currentValue);
+        }
     }
 
     @Override
     int compare(int from, int to) {
+        if (missingBucket) {
+            if (bits.get(from) == false) {
+                return bits.get(to) ? -1 * reverseMul : 0;
+            } else if (bits.get(to) == false) {
+                return reverseMul;
+            }
+        }
         return compareValues(values.get(from), values.get(to));
     }
 
     @Override
     int compareCurrent(int slot) {
+        if (missingBucket) {
+            if (missingCurrentValue) {
+                return bits.get(slot) ? -1 * reverseMul : 0;
+            } else if (bits.get(slot) == false) {
+                return reverseMul;
+            }
+        }
         return compareValues(currentValue, values.get(slot));
     }
 
     @Override
     int compareCurrentWithAfter() {
+        if (missingBucket) {
+            if (missingCurrentValue) {
+                return afterValue != null ? -1 * reverseMul : 0;
+            } else if (afterValue == null) {
+                return reverseMul;
+            }
+        }
         return compareValues(currentValue, afterValue);
     }
 
@@ -75,7 +108,9 @@ class DoubleValuesSource extends SingleDimensionValuesSource<Double> {
 
     @Override
     void setAfter(Comparable<?> value) {
-        if (value instanceof Number) {
+        if (missingBucket && value == null) {
+            afterValue = null;
+        } else if (value instanceof Number) {
             afterValue = ((Number) value).doubleValue();
         } else {
             afterValue = format.parseDouble(value.toString(), false, () -> {
@@ -86,6 +121,10 @@ class DoubleValuesSource extends SingleDimensionValuesSource<Double> {
 
     @Override
     Double toComparable(int slot) {
+        if (missingBucket && bits.get(slot) == false) {
+            return null;
+        }
+        assert missingBucket == false || bits.get(slot);
         return values.get(slot);
     }
 
@@ -99,8 +138,12 @@ class DoubleValuesSource extends SingleDimensionValuesSource<Double> {
                     int num = dvs.docValueCount();
                     for (int i = 0; i < num; i++) {
                         currentValue = dvs.nextValue();
+                        missingCurrentValue = false;
                         next.collect(doc, bucket);
                     }
+                } else if (missingBucket) {
+                    missingCurrentValue = true;
+                    next.collect(doc, bucket);
                 }
             }
         };
@@ -127,6 +170,6 @@ class DoubleValuesSource extends SingleDimensionValuesSource<Double> {
 
     @Override
     public void close() {
-        Releasables.close(values);
+        Releasables.close(values, bits);
     }
 }

+ 19 - 10
server/src/main/java/org/elasticsearch/search/aggregations/bucket/composite/GlobalOrdinalValuesSource.java

@@ -43,7 +43,7 @@ import static org.apache.lucene.index.SortedSetDocValues.NO_MORE_ORDS;
  */
 class GlobalOrdinalValuesSource extends SingleDimensionValuesSource<BytesRef> {
     private final CheckedFunction<LeafReaderContext, SortedSetDocValues, IOException> docValuesFunc;
-    private final LongArray values;
+    private LongArray values;
     private SortedSetDocValues lookup;
     private long currentValue;
     private Long afterValueGlobalOrd;
@@ -52,16 +52,17 @@ class GlobalOrdinalValuesSource extends SingleDimensionValuesSource<BytesRef> {
     private long lastLookupOrd = -1;
     private BytesRef lastLookupValue;
 
-    GlobalOrdinalValuesSource(BigArrays bigArrays,
-                              MappedFieldType type, CheckedFunction<LeafReaderContext, SortedSetDocValues, IOException> docValuesFunc,
-                              DocValueFormat format, Object missing, int size, int reverseMul) {
-        super(format, type, missing, size, reverseMul);
+    GlobalOrdinalValuesSource(BigArrays bigArrays, MappedFieldType type,
+                              CheckedFunction<LeafReaderContext, SortedSetDocValues, IOException> docValuesFunc,
+                              DocValueFormat format, boolean missingBucket, Object missing, int size, int reverseMul) {
+        super(bigArrays, format, type, missingBucket, missing, size, reverseMul);
         this.docValuesFunc = docValuesFunc;
-        this.values = bigArrays.newLongArray(size, false);
+        this.values = bigArrays.newLongArray(Math.min(size, 100), false);
     }
 
     @Override
     void copyCurrent(int slot) {
+        values = bigArrays.grow(values, slot+1);
         values.set(slot, currentValue);
     }
 
@@ -89,7 +90,10 @@ class GlobalOrdinalValuesSource extends SingleDimensionValuesSource<BytesRef> {
 
     @Override
     void setAfter(Comparable<?> value) {
-        if (value.getClass() == String.class) {
+        if (missingBucket && value == null) {
+            afterValue = null;
+            afterValueGlobalOrd = -1L;
+        } else if (value.getClass() == String.class) {
             afterValue = format.parseBytesRef(value.toString());
         } else {
             throw new IllegalArgumentException("invalid value, expected string, got " + value.getClass().getSimpleName());
@@ -99,10 +103,12 @@ class GlobalOrdinalValuesSource extends SingleDimensionValuesSource<BytesRef> {
     @Override
     BytesRef toComparable(int slot) throws IOException {
         long globalOrd = values.get(slot);
-        if (globalOrd == lastLookupOrd) {
+        if (missingBucket && globalOrd == -1) {
+            return null;
+        } else if (globalOrd == lastLookupOrd) {
             return lastLookupValue;
         } else {
-            lastLookupOrd= globalOrd;
+            lastLookupOrd = globalOrd;
             lastLookupValue = BytesRef.deepCopyOf(lookup.lookupOrd(values.get(slot)));
             return lastLookupValue;
         }
@@ -123,6 +129,9 @@ class GlobalOrdinalValuesSource extends SingleDimensionValuesSource<BytesRef> {
                         currentValue = ord;
                         next.collect(doc, bucket);
                     }
+                } else if (missingBucket) {
+                    currentValue = -1;
+                    next.collect(doc, bucket);
                 }
             }
         };
@@ -143,7 +152,7 @@ class GlobalOrdinalValuesSource extends SingleDimensionValuesSource<BytesRef> {
 
             @Override
             public void collect(int doc, long bucket) throws IOException {
-                if (!currentValueIsSet) {
+                if (currentValueIsSet == false) {
                     if (dvs.advanceExact(doc)) {
                         long ord;
                         while ((ord = dvs.nextOrd()) != NO_MORE_ORDS) {

+ 1 - 1
server/src/main/java/org/elasticsearch/search/aggregations/bucket/composite/HistogramValuesSourceBuilder.java

@@ -115,7 +115,7 @@ public class HistogramValuesSourceBuilder extends CompositeValuesSourceBuilder<H
             ValuesSource.Numeric numeric = (ValuesSource.Numeric) orig;
             final HistogramValuesSource vs = new HistogramValuesSource(numeric, interval);
             final MappedFieldType fieldType = config.fieldContext() != null ? config.fieldContext().fieldType() : null;
-            return new CompositeValuesSourceConfig(name, fieldType, vs, config.format(), order(), missing());
+            return new CompositeValuesSourceConfig(name, fieldType, vs, config.format(), order(), missingBucket(), missing());
         } else {
             throw new IllegalArgumentException("invalid source, expected numeric, got " + orig.getClass().getSimpleName());
         }

+ 15 - 4
server/src/main/java/org/elasticsearch/search/aggregations/bucket/composite/InternalComposite.java

@@ -332,6 +332,14 @@ public class InternalComposite
         @Override
         public int compareKey(InternalBucket other) {
             for (int i = 0; i < key.size(); i++) {
+                if (key.get(i) == null) {
+                    if (other.key.get(i) == null) {
+                        continue;
+                    }
+                    return -1 * reverseMuls[i];
+                } else if (other.key.get(i) == null) {
+                    return reverseMuls[i];
+                }
                 assert key.get(i).getClass() == other.key.get(i).getClass();
                 @SuppressWarnings("unchecked")
                 int cmp = ((Comparable) key.get(i)).compareTo(other.key.get(i)) * reverseMuls[i];
@@ -357,26 +365,29 @@ public class InternalComposite
      * for numbers and a string for {@link BytesRef}s.
      */
     static Object formatObject(Object obj, DocValueFormat format) {
+        if (obj == null) {
+            return null;
+        }
         if (obj.getClass() == BytesRef.class) {
             BytesRef value = (BytesRef) obj;
             if (format == DocValueFormat.RAW) {
                 return value.utf8ToString();
             } else {
-                return format.format((BytesRef) obj);
+                return format.format(value);
             }
         } else if (obj.getClass() == Long.class) {
-            Long value = (Long) obj;
+            long value = (long) obj;
             if (format == DocValueFormat.RAW) {
                 return value;
             } else {
                 return format.format(value);
             }
         } else if (obj.getClass() == Double.class) {
-            Double value = (Double) obj;
+            double value = (double) obj;
             if (format == DocValueFormat.RAW) {
                 return value;
             } else {
-                return format.format((Double) obj);
+                return format.format(value);
             }
         }
         return obj;

+ 53 - 9
server/src/main/java/org/elasticsearch/search/aggregations/bucket/composite/LongValuesSource.java

@@ -45,38 +45,73 @@ import java.util.function.ToLongFunction;
  * A {@link SingleDimensionValuesSource} for longs.
  */
 class LongValuesSource extends SingleDimensionValuesSource<Long> {
+    private final BigArrays bigArrays;
     private final CheckedFunction<LeafReaderContext, SortedNumericDocValues, IOException> docValuesFunc;
     private final LongUnaryOperator rounding;
 
-    private final LongArray values;
+    private BitArray bits;
+    private LongArray values;
     private long currentValue;
+    private boolean missingCurrentValue;
 
-    LongValuesSource(BigArrays bigArrays, MappedFieldType fieldType,
-                     CheckedFunction<LeafReaderContext, SortedNumericDocValues, IOException> docValuesFunc,
-                     LongUnaryOperator rounding, DocValueFormat format, Object missing, int size, int reverseMul) {
-        super(format, fieldType, missing, size, reverseMul);
+    LongValuesSource(BigArrays bigArrays,
+                     MappedFieldType fieldType, CheckedFunction<LeafReaderContext, SortedNumericDocValues, IOException> docValuesFunc,
+                     LongUnaryOperator rounding, DocValueFormat format, boolean missingBucket, Object missing, int size, int reverseMul) {
+        super(bigArrays, format, fieldType, missingBucket, missing, size, reverseMul);
+        this.bigArrays = bigArrays;
         this.docValuesFunc = docValuesFunc;
         this.rounding = rounding;
-        this.values = bigArrays.newLongArray(size, false);
+        this.bits = missingBucket ? new BitArray(bigArrays, Math.min(size, 100)) : null;
+        this.values = bigArrays.newLongArray(Math.min(size, 100), false);
     }
 
     @Override
     void copyCurrent(int slot) {
-        values.set(slot, currentValue);
+        values = bigArrays.grow(values, slot+1);
+        if (missingBucket && missingCurrentValue) {
+            bits.clear(slot);
+        } else {
+            assert missingCurrentValue == false;
+            if (missingBucket) {
+                bits.set(slot);
+            }
+            values.set(slot, currentValue);
+        }
     }
 
     @Override
     int compare(int from, int to) {
+        if (missingBucket) {
+            if (bits.get(from) == false) {
+                return bits.get(to) ? -1 * reverseMul : 0;
+            } else if (bits.get(to) == false) {
+                return reverseMul;
+            }
+        }
         return compareValues(values.get(from), values.get(to));
     }
 
     @Override
     int compareCurrent(int slot) {
+        if (missingBucket) {
+            if (missingCurrentValue) {
+                return bits.get(slot) ? -1 * reverseMul : 0;
+            } else if (bits.get(slot) == false) {
+                return reverseMul;
+            }
+        }
         return compareValues(currentValue, values.get(slot));
     }
 
     @Override
     int compareCurrentWithAfter() {
+        if (missingBucket) {
+            if (missingCurrentValue) {
+                return afterValue != null ? -1 * reverseMul : 0;
+            } else if (afterValue == null) {
+                return reverseMul;
+            }
+        }
         return compareValues(currentValue, afterValue);
     }
 
@@ -86,7 +121,9 @@ class LongValuesSource extends SingleDimensionValuesSource<Long> {
 
     @Override
     void setAfter(Comparable<?> value) {
-        if (value instanceof Number) {
+        if (missingBucket && value == null) {
+            afterValue = null;
+        } else if (value instanceof Number) {
             afterValue = ((Number) value).longValue();
         } else {
             // for date histogram source with "format", the after value is formatted
@@ -99,6 +136,9 @@ class LongValuesSource extends SingleDimensionValuesSource<Long> {
 
     @Override
     Long toComparable(int slot) {
+        if (missingBucket && bits.get(slot) == false) {
+            return null;
+        }
         return values.get(slot);
     }
 
@@ -112,8 +152,12 @@ class LongValuesSource extends SingleDimensionValuesSource<Long> {
                     int num = dvs.docValueCount();
                     for (int i = 0; i < num; i++) {
                         currentValue = dvs.nextValue();
+                        missingCurrentValue = false;
                         next.collect(doc, bucket);
                     }
+                } else if (missingBucket) {
+                    missingCurrentValue = true;
+                    next.collect(doc, bucket);
                 }
             }
         };
@@ -182,6 +226,6 @@ class LongValuesSource extends SingleDimensionValuesSource<Long> {
 
     @Override
     public void close() {
-        Releasables.close(values);
+        Releasables.close(values, bits);
     }
 }

+ 11 - 1
server/src/main/java/org/elasticsearch/search/aggregations/bucket/composite/SingleDimensionValuesSource.java

@@ -25,6 +25,7 @@ import org.apache.lucene.index.LeafReaderContext;
 import org.apache.lucene.search.Query;
 import org.elasticsearch.common.Nullable;
 import org.elasticsearch.common.lease.Releasable;
+import org.elasticsearch.common.util.BigArrays;
 import org.elasticsearch.index.mapper.MappedFieldType;
 import org.elasticsearch.search.DocValueFormat;
 import org.elasticsearch.search.aggregations.LeafBucketCollector;
@@ -36,11 +37,13 @@ import java.io.IOException;
  * A source that can record and compare values of similar type.
  */
 abstract class SingleDimensionValuesSource<T extends Comparable<T>> implements Releasable {
+    protected final BigArrays bigArrays;
     protected final DocValueFormat format;
     @Nullable
     protected final MappedFieldType fieldType;
     @Nullable
     protected final Object missing;
+    protected final boolean missingBucket;
 
     protected final int size;
     protected final int reverseMul;
@@ -50,17 +53,23 @@ abstract class SingleDimensionValuesSource<T extends Comparable<T>> implements R
     /**
      * Creates a new {@link SingleDimensionValuesSource}.
      *
+     * @param bigArrays The big arrays object.
      * @param format The format of the source.
      * @param fieldType The field type or null if the source is a script.
+     * @param missingBucket If true, an explicit `null bucket represents documents with missing values.
      * @param missing The missing value or null if documents with missing value should be ignored.
      * @param size The number of values to record.
      * @param reverseMul -1 if the natural order ({@link SortOrder#ASC} should be reversed.
      */
-    SingleDimensionValuesSource(DocValueFormat format, @Nullable MappedFieldType fieldType, @Nullable Object missing,
+    SingleDimensionValuesSource(BigArrays bigArrays, DocValueFormat format,
+                                @Nullable MappedFieldType fieldType, boolean missingBucket, @Nullable Object missing,
                                 int size, int reverseMul) {
+        assert missing == null || missingBucket == false;
+        this.bigArrays = bigArrays;
         this.format = format;
         this.fieldType = fieldType;
         this.missing = missing;
+        this.missingBucket = missingBucket;
         this.size = size;
         this.reverseMul = reverseMul;
         this.afterValue = null;
@@ -139,6 +148,7 @@ abstract class SingleDimensionValuesSource<T extends Comparable<T>> implements R
     protected boolean checkIfSortedDocsIsApplicable(IndexReader reader, MappedFieldType fieldType) {
         if (fieldType == null ||
                 missing != null ||
+                (missingBucket && afterValue == null) ||
                 fieldType.indexOptions() == IndexOptions.NONE ||
                 // inverse of the natural order
                 reverseMul == -1) {

+ 2 - 1
server/src/main/java/org/elasticsearch/search/aggregations/bucket/composite/TermsSortedDocsProducer.java

@@ -61,8 +61,9 @@ class TermsSortedDocsProducer extends SortedDocsProducer {
         DocIdSetBuilder builder = fillDocIdSet ? new DocIdSetBuilder(context.reader().maxDoc(), terms) : null;
         PostingsEnum reuse = null;
         boolean first = true;
+        final BytesRef upper = upperValue == null ? null : BytesRef.deepCopyOf(upperValue);
         do {
-            if (upperValue != null && upperValue.compareTo(te.term()) < 0) {
+            if (upper != null && upper.compareTo(te.term()) < 0) {
                 break;
             }
             reuse = te.postings(reuse, PostingsEnum.NONE);

+ 1 - 1
server/src/main/java/org/elasticsearch/search/aggregations/bucket/composite/TermsValuesSourceBuilder.java

@@ -93,6 +93,6 @@ public class TermsValuesSourceBuilder extends CompositeValuesSourceBuilder<Terms
         } else {
             format = config.format();
         }
-        return new CompositeValuesSourceConfig(name, fieldType, vs, format, order(), missing());
+        return new CompositeValuesSourceConfig(name, fieldType, vs, format, order(), missingBucket(), missing());
     }
 }

+ 54 - 0
server/src/test/java/org/elasticsearch/search/aggregations/bucket/composite/BitArrayTests.java

@@ -0,0 +1,54 @@
+/*
+ * Licensed to Elasticsearch under one or more contributor
+ * license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright
+ * ownership. Elasticsearch licenses this file to you under
+ * the Apache License, Version 2.0 (the "License"); you may
+ * not use this file except in compliance with the License.
+ * You may obtain a copy of the License at
+ *
+ *    http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied.  See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+
+package org.elasticsearch.search.aggregations.bucket.composite;
+
+import org.elasticsearch.common.util.BigArrays;
+import org.elasticsearch.test.ESTestCase;
+
+import java.util.ArrayList;
+import java.util.Collections;
+import java.util.List;
+
+public class BitArrayTests extends ESTestCase {
+    public void testRandom() {
+        try (BitArray bitArray = new BitArray(BigArrays.NON_RECYCLING_INSTANCE, 1)) {
+            int numBits = randomIntBetween(1000, 10000);
+            for (int step = 0; step < 3; step++) {
+                boolean[] bits = new boolean[numBits];
+                List<Integer> slots = new ArrayList<>();
+                for (int i = 0; i < numBits; i++) {
+                    bits[i] = randomBoolean();
+                    slots.add(i);
+                }
+                Collections.shuffle(slots, random());
+                for (int i : slots) {
+                    if (bits[i]) {
+                        bitArray.set(i);
+                    } else {
+                        bitArray.clear(i);
+                    }
+                }
+                for (int i = 0; i < numBits; i++) {
+                    assertEquals(bitArray.get(i), bits[i]);
+                }
+            }
+        }
+    }
+}

+ 9 - 0
server/src/test/java/org/elasticsearch/search/aggregations/bucket/composite/CompositeAggregationBuilderTests.java

@@ -44,6 +44,9 @@ public class CompositeAggregationBuilderTests extends BaseAggregationTestCase<Co
         if (randomBoolean()) {
             histo.timeZone(randomDateTimeZone());
         }
+        if (randomBoolean()) {
+            histo.missingBucket(true);
+        }
         return histo;
     }
 
@@ -55,6 +58,9 @@ public class CompositeAggregationBuilderTests extends BaseAggregationTestCase<Co
             terms.script(new Script(randomAlphaOfLengthBetween(10, 20)));
         }
         terms.order(randomFrom(SortOrder.values()));
+        if (randomBoolean()) {
+            terms.missingBucket(true);
+        }
         return terms;
     }
 
@@ -65,6 +71,9 @@ public class CompositeAggregationBuilderTests extends BaseAggregationTestCase<Co
         } else {
             histo.script(new Script(randomAlphaOfLengthBetween(10, 20)));
         }
+        if (randomBoolean()) {
+            histo.missingBucket(true);
+        }
         histo.interval(randomDoubleBetween(Math.nextUp(0), Double.MAX_VALUE, false));
         return histo;
     }

+ 168 - 3
server/src/test/java/org/elasticsearch/search/aggregations/bucket/composite/CompositeAggregatorTests.java

@@ -136,12 +136,25 @@ public class CompositeAggregatorTests extends AggregatorTestCase {
         IndexSearcher searcher = new IndexSearcher(new MultiReader());
         QueryShardException exc =
             expectThrows(QueryShardException.class, () -> createAggregatorFactory(builder, searcher));
-        assertThat(exc.getMessage(), containsString("failed to find field [unknown] and [missing] is not provided"));
-        // should work when missing is provided
-        terms.missing("missing");
+        assertThat(exc.getMessage(), containsString("failed to find field [unknown] and [missing_bucket] is not set"));
+        // should work when missing_bucket is set
+        terms.missingBucket(true);
         createAggregatorFactory(builder, searcher);
     }
 
+    public void testMissingBucket() throws Exception {
+        TermsValuesSourceBuilder terms = new TermsValuesSourceBuilder(randomAlphaOfLengthBetween(5, 10))
+            .field("unknown")
+            .missingBucket(true)
+            .missing("MISSING");
+        CompositeAggregationBuilder builder = new CompositeAggregationBuilder("test", Collections.singletonList(terms));
+        IndexSearcher searcher = new IndexSearcher(new MultiReader());
+        QueryShardException exc =
+            expectThrows(QueryShardException.class, () -> createAggregator(builder, searcher));
+        assertWarnings("[missing] is deprecated. Please use [missing_bucket] instead.");
+        assertThat(exc.getMessage(), containsString("cannot use [missing] option in conjunction with [missing_bucket]"));
+    }
+
     public void testWithKeyword() throws Exception {
         final List<Map<String, List<Object>>> dataset = new ArrayList<>();
         dataset.addAll(
@@ -187,6 +200,97 @@ public class CompositeAggregatorTests extends AggregatorTestCase {
         );
     }
 
+    public void testWithKeywordAndMissingBucket() throws Exception {
+        final List<Map<String, List<Object>>> dataset = new ArrayList<>();
+        dataset.addAll(
+            Arrays.asList(
+                createDocument("keyword", "a"),
+                createDocument("long", 0L),
+                createDocument("keyword", "c"),
+                createDocument("keyword", "a"),
+                createDocument("keyword", "d"),
+                createDocument("keyword", "c"),
+                createDocument("long", 5L)
+            )
+        );
+
+        // sort ascending, null bucket is first
+        testSearchCase(Arrays.asList(new MatchAllDocsQuery()), dataset,
+            () -> {
+                TermsValuesSourceBuilder terms = new TermsValuesSourceBuilder("keyword")
+                    .field("keyword")
+                    .missingBucket(true);
+                return new CompositeAggregationBuilder("name", Collections.singletonList(terms));
+            }, (result) -> {
+                assertEquals(4, result.getBuckets().size());
+                assertEquals("{keyword=d}", result.afterKey().toString());
+                assertEquals("{keyword=null}", result.getBuckets().get(0).getKeyAsString());
+                assertEquals(2L, result.getBuckets().get(0).getDocCount());
+                assertEquals("{keyword=a}", result.getBuckets().get(1).getKeyAsString());
+                assertEquals(2L, result.getBuckets().get(1).getDocCount());
+                assertEquals("{keyword=c}", result.getBuckets().get(2).getKeyAsString());
+                assertEquals(2L, result.getBuckets().get(2).getDocCount());
+                assertEquals("{keyword=d}", result.getBuckets().get(3).getKeyAsString());
+                assertEquals(1L, result.getBuckets().get(3).getDocCount());
+            }
+        );
+
+        // sort descending, null bucket is last
+        testSearchCase(Arrays.asList(new MatchAllDocsQuery()), dataset,
+            () -> {
+                TermsValuesSourceBuilder terms = new TermsValuesSourceBuilder("keyword")
+                    .field("keyword")
+                    .missingBucket(true)
+                    .order(SortOrder.DESC);
+                return new CompositeAggregationBuilder("name", Collections.singletonList(terms));
+            }, (result) -> {
+                assertEquals(4, result.getBuckets().size());
+                assertEquals("{keyword=null}", result.afterKey().toString());
+                assertEquals("{keyword=null}", result.getBuckets().get(3).getKeyAsString());
+                assertEquals(2L, result.getBuckets().get(3).getDocCount());
+                assertEquals("{keyword=a}", result.getBuckets().get(2).getKeyAsString());
+                assertEquals(2L, result.getBuckets().get(2).getDocCount());
+                assertEquals("{keyword=c}", result.getBuckets().get(1).getKeyAsString());
+                assertEquals(2L, result.getBuckets().get(1).getDocCount());
+                assertEquals("{keyword=d}", result.getBuckets().get(0).getKeyAsString());
+                assertEquals(1L, result.getBuckets().get(0).getDocCount());
+            }
+        );
+
+        testSearchCase(Arrays.asList(new MatchAllDocsQuery(), new DocValuesFieldExistsQuery("keyword")), dataset,
+            () -> {
+                TermsValuesSourceBuilder terms = new TermsValuesSourceBuilder("keyword")
+                    .field("keyword")
+                    .missingBucket(true);
+                return new CompositeAggregationBuilder("name", Collections.singletonList(terms))
+                    .aggregateAfter(Collections.singletonMap("keyword", null));
+            }, (result) -> {
+                assertEquals(3, result.getBuckets().size());
+                assertEquals("{keyword=d}", result.afterKey().toString());
+                assertEquals("{keyword=a}", result.getBuckets().get(0).getKeyAsString());
+                assertEquals(2L, result.getBuckets().get(0).getDocCount());
+                assertEquals("{keyword=c}", result.getBuckets().get(1).getKeyAsString());
+                assertEquals(2L, result.getBuckets().get(1).getDocCount());
+                assertEquals("{keyword=d}", result.getBuckets().get(2).getKeyAsString());
+                assertEquals(1L, result.getBuckets().get(2).getDocCount());
+            }
+        );
+
+        testSearchCase(Arrays.asList(new MatchAllDocsQuery(), new DocValuesFieldExistsQuery("keyword")), dataset,
+            () -> {
+                TermsValuesSourceBuilder terms = new TermsValuesSourceBuilder("keyword")
+                    .field("keyword")
+                    .missingBucket(true)
+                    .order(SortOrder.DESC);
+                return new CompositeAggregationBuilder("name", Collections.singletonList(terms))
+                    .aggregateAfter(Collections.singletonMap("keyword", null));
+            }, (result) -> {
+                assertEquals(0, result.getBuckets().size());
+                assertNull(result.afterKey());
+            }
+        );
+    }
+
     public void testWithKeywordMissingAfter() throws Exception {
         final List<Map<String, List<Object>>> dataset = new ArrayList<>();
         dataset.addAll(
@@ -518,6 +622,67 @@ public class CompositeAggregatorTests extends AggregatorTestCase {
         );
     }
 
+    public void testWithKeywordLongAndMissingBucket() throws Exception {
+        final List<Map<String, List<Object>>> dataset = new ArrayList<>();
+        dataset.addAll(
+            Arrays.asList(
+                createDocument("keyword", "a", "long", 100L),
+                createDocument("double", 0d),
+                createDocument("keyword", "c", "long", 100L),
+                createDocument("keyword", "a", "long", 0L),
+                createDocument("keyword", "d", "long", 10L),
+                createDocument("keyword", "c"),
+                createDocument("keyword", "c", "long", 100L),
+                createDocument("long", 100L),
+                createDocument("double", 0d)
+            )
+        );
+        testSearchCase(Arrays.asList(new MatchAllDocsQuery()), dataset,
+            () -> new CompositeAggregationBuilder("name",
+                Arrays.asList(
+                    new TermsValuesSourceBuilder("keyword").field("keyword").missingBucket(true),
+                    new TermsValuesSourceBuilder("long").field("long").missingBucket(true)
+                )
+            ),
+            (result) -> {
+                assertEquals(7, result.getBuckets().size());
+                assertEquals("{keyword=d, long=10}", result.afterKey().toString());
+                assertEquals("{keyword=null, long=null}", result.getBuckets().get(0).getKeyAsString());
+                assertEquals(2L, result.getBuckets().get(0).getDocCount());
+                assertEquals("{keyword=null, long=100}", result.getBuckets().get(1).getKeyAsString());
+                assertEquals(1L, result.getBuckets().get(1).getDocCount());
+                assertEquals("{keyword=a, long=0}", result.getBuckets().get(2).getKeyAsString());
+                assertEquals(1L, result.getBuckets().get(2).getDocCount());
+                assertEquals("{keyword=a, long=100}", result.getBuckets().get(3).getKeyAsString());
+                assertEquals(1L, result.getBuckets().get(3).getDocCount());
+                assertEquals("{keyword=c, long=null}", result.getBuckets().get(4).getKeyAsString());
+                assertEquals(1L, result.getBuckets().get(4).getDocCount());
+                assertEquals("{keyword=c, long=100}", result.getBuckets().get(5).getKeyAsString());
+                assertEquals(2L, result.getBuckets().get(5).getDocCount());
+                assertEquals("{keyword=d, long=10}", result.getBuckets().get(6).getKeyAsString());
+                assertEquals(1L, result.getBuckets().get(6).getDocCount());
+            }
+        );
+
+        testSearchCase(Arrays.asList(new MatchAllDocsQuery(), new DocValuesFieldExistsQuery("keyword")), dataset,
+            () -> new CompositeAggregationBuilder("name",
+                Arrays.asList(
+                    new TermsValuesSourceBuilder("keyword").field("keyword").missingBucket(true),
+                    new TermsValuesSourceBuilder("long").field("long").missingBucket(true)
+                )
+            ).aggregateAfter(createAfterKey("keyword", "c", "long", null)
+            ),
+            (result) -> {
+                assertEquals(2, result.getBuckets().size());
+                assertEquals("{keyword=d, long=10}", result.afterKey().toString());
+                assertEquals("{keyword=c, long=100}", result.getBuckets().get(0).getKeyAsString());
+                assertEquals(2L, result.getBuckets().get(0).getDocCount());
+                assertEquals("{keyword=d, long=10}", result.getBuckets().get(1).getKeyAsString());
+                assertEquals(1L, result.getBuckets().get(1).getDocCount());
+            }
+        );
+    }
+
     public void testMultiValuedWithKeywordAndLong() throws Exception {
         final List<Map<String, List<Object>>> dataset = new ArrayList<>();
         dataset.addAll(

+ 76 - 44
server/src/test/java/org/elasticsearch/search/aggregations/bucket/composite/CompositeValuesCollectorQueueTests.java

@@ -129,21 +129,24 @@ public class CompositeValuesCollectorQueueTests extends AggregatorTestCase {
                     assert(false);
             }
         }
-        testRandomCase(true, types);
+        testRandomCase(types);
     }
 
     private void testRandomCase(ClassAndName... types) throws IOException {
-        testRandomCase(true, types);
-        testRandomCase(false, types);
+        testRandomCase(true, true, types);
+        testRandomCase(true, false, types);
+        testRandomCase(false, true, types);
+        testRandomCase(false, false, types);
     }
 
-    private void testRandomCase(boolean forceMerge, ClassAndName... types) throws IOException {
+    private void testRandomCase(boolean forceMerge, boolean missingBucket, ClassAndName... types) throws IOException {
         final BigArrays bigArrays = BigArrays.NON_RECYCLING_INSTANCE;
         int numDocs = randomIntBetween(50, 100);
         List<Comparable<?>[]> possibleValues = new ArrayList<>();
         for (ClassAndName type : types) {
-            int numValues = randomIntBetween(1, numDocs*2);
-            Comparable<?>[] values = new Comparable[numValues];
+            final Comparable<?>[] values;
+            int numValues = randomIntBetween(1, numDocs * 2);
+            values = new Comparable[numValues];
             if (type.clazz == Long.class) {
                 for (int i = 0; i < numValues; i++) {
                     values[i] = randomLong();
@@ -157,7 +160,7 @@ public class CompositeValuesCollectorQueueTests extends AggregatorTestCase {
                     values[i] = new BytesRef(randomAlphaOfLengthBetween(5, 50));
                 }
             } else {
-                assert(false);
+                assert (false);
             }
             possibleValues.add(values);
         }
@@ -171,30 +174,34 @@ public class CompositeValuesCollectorQueueTests extends AggregatorTestCase {
                     boolean hasAllField = true;
                     for (int j = 0; j < types.length; j++) {
                         int numValues = randomIntBetween(0, 5);
+                        List<Comparable<?>> values = new ArrayList<>();
                         if (numValues == 0) {
                             hasAllField = false;
-                        }
-                        List<Comparable<?>> values = new ArrayList<>();
-                        for (int k = 0; k < numValues; k++) {
-                            values.add(possibleValues.get(j)[randomIntBetween(0, possibleValues.get(j).length-1)]);
-                            if (types[j].clazz == Long.class) {
-                                long value = (Long) values.get(k);
-                                document.add(new SortedNumericDocValuesField(types[j].fieldType.name(), value));
-                                document.add(new LongPoint(types[j].fieldType.name(), value));
-                            } else if (types[j].clazz == Double.class) {
-                                document.add(new SortedNumericDocValuesField(types[j].fieldType.name(),
-                                    NumericUtils.doubleToSortableLong((Double) values.get(k))));
-                            } else if (types[j].clazz == BytesRef.class) {
-                                BytesRef value = (BytesRef) values.get(k);
-                                document.add(new SortedSetDocValuesField(types[j].fieldType.name(), (BytesRef) values.get(k)));
-                                document.add(new TextField(types[j].fieldType.name(), value.utf8ToString(), Field.Store.NO));
-                            } else {
-                                assert(false);
+                            if (missingBucket) {
+                                values.add(null);
+                            }
+                        } else {
+                            for (int k = 0; k < numValues; k++) {
+                                values.add(possibleValues.get(j)[randomIntBetween(0, possibleValues.get(j).length - 1)]);
+                                if (types[j].clazz == Long.class) {
+                                    long value = (Long) values.get(k);
+                                    document.add(new SortedNumericDocValuesField(types[j].fieldType.name(), value));
+                                    document.add(new LongPoint(types[j].fieldType.name(), value));
+                                } else if (types[j].clazz == Double.class) {
+                                    document.add(new SortedNumericDocValuesField(types[j].fieldType.name(),
+                                        NumericUtils.doubleToSortableLong((Double) values.get(k))));
+                                } else if (types[j].clazz == BytesRef.class) {
+                                    BytesRef value = (BytesRef) values.get(k);
+                                    document.add(new SortedSetDocValuesField(types[j].fieldType.name(), (BytesRef) values.get(k)));
+                                    document.add(new TextField(types[j].fieldType.name(), value.utf8ToString(), Field.Store.NO));
+                                } else {
+                                    assert (false);
+                                }
                             }
                         }
                         docValues.add(values);
                     }
-                    if (hasAllField) {
+                    if (hasAllField || missingBucket) {
                         List<CompositeKey> comb = createListCombinations(docValues);
                         keys.addAll(comb);
                     }
@@ -210,29 +217,53 @@ public class CompositeValuesCollectorQueueTests extends AggregatorTestCase {
             for (int i = 0; i < types.length; i++) {
                 final MappedFieldType fieldType = types[i].fieldType;
                 if (types[i].clazz == Long.class) {
-                    sources[i] = new LongValuesSource(bigArrays, fieldType,
-                        context -> DocValues.getSortedNumeric(context.reader(), fieldType.name()), value -> value,
-                        DocValueFormat.RAW, null, size, 1);
+                    sources[i] = new LongValuesSource(
+                        bigArrays,
+                        fieldType,
+                        context -> DocValues.getSortedNumeric(context.reader(), fieldType.name()),
+                        value -> value,
+                        DocValueFormat.RAW,
+                        missingBucket,
+                        null,
+                        size,
+                        1
+                    );
                 } else if (types[i].clazz == Double.class) {
                     sources[i] = new DoubleValuesSource(
-                        bigArrays, fieldType,
+                        bigArrays,
+                        fieldType,
                         context -> FieldData.sortableLongBitsToDoubles(DocValues.getSortedNumeric(context.reader(), fieldType.name())),
-                        DocValueFormat.RAW, null, size, 1
+                        DocValueFormat.RAW,
+                        missingBucket,
+                        null,
+                        size,
+                        1
                     );
                 } else if (types[i].clazz == BytesRef.class) {
                     if (forceMerge) {
                         // we don't create global ordinals but we test this mode when the reader has a single segment
                         // since ordinals are global in this case.
                         sources[i] = new GlobalOrdinalValuesSource(
-                            bigArrays, fieldType,
+                            bigArrays,
+                            fieldType,
                             context -> DocValues.getSortedSet(context.reader(), fieldType.name()),
-                            DocValueFormat.RAW, null, size, 1
+                            DocValueFormat.RAW,
+                            missingBucket,
+                            null,
+                            size,
+                            1
                         );
                     } else {
                         sources[i] = new BinaryValuesSource(
+                            bigArrays,
+                            (b) -> {},
                             fieldType,
                             context -> FieldData.toString(DocValues.getSortedSet(context.reader(), fieldType.name())),
-                            DocValueFormat.RAW, null, size, 1
+                            DocValueFormat.RAW,
+                            missingBucket,
+                            null,
+                            size,
+                            1
                         );
                     }
                 } else {
@@ -241,20 +272,13 @@ public class CompositeValuesCollectorQueueTests extends AggregatorTestCase {
             }
             CompositeKey[] expected = keys.toArray(new CompositeKey[0]);
             Arrays.sort(expected, (a, b) -> compareKey(a, b));
-            CompositeValuesCollectorQueue queue = new CompositeValuesCollectorQueue(sources, size);
-            final SortedDocsProducer docsProducer = sources[0].createSortedDocsProducerOrNull(reader, new MatchAllDocsQuery());
             for (boolean withProducer : new boolean[] {true, false}) {
-                if (withProducer && docsProducer == null) {
-                    continue;
-                }
                 int pos = 0;
                 CompositeKey last = null;
                 while (pos < size) {
-                    queue.clear();
-                    if (last != null) {
-                        queue.setAfter(last.values());
-                    }
-
+                    final CompositeValuesCollectorQueue queue =
+                        new CompositeValuesCollectorQueue(BigArrays.NON_RECYCLING_INSTANCE, sources, size, last);
+                    final SortedDocsProducer docsProducer = sources[0].createSortedDocsProducerOrNull(reader, new MatchAllDocsQuery());
                     for (LeafReaderContext leafReaderContext : reader.leaves()) {
                         final LeafBucketCollector leafCollector = new LeafBucketCollector() {
                             @Override
@@ -262,7 +286,7 @@ public class CompositeValuesCollectorQueueTests extends AggregatorTestCase {
                                 queue.addIfCompetitive();
                             }
                         };
-                        if (withProducer) {
+                        if (docsProducer != null && withProducer) {
                             assertEquals(DocIdSet.EMPTY,
                                 docsProducer.processLeaf(new MatchAllDocsQuery(), queue, leafReaderContext, false));
                         } else {
@@ -310,6 +334,14 @@ public class CompositeValuesCollectorQueueTests extends AggregatorTestCase {
     private static int compareKey(CompositeKey key1, CompositeKey key2) {
         assert key1.size() == key2.size();
         for (int i = 0; i < key1.size(); i++) {
+            if (key1.get(i) == null) {
+                if (key2.get(i) == null) {
+                    continue;
+                }
+                return -1;
+            } else if (key2.get(i) == null) {
+                return 1;
+            }
             Comparable<Object> cmp1 = (Comparable<Object>) key1.get(i);
             int cmp = cmp1.compareTo(key2.get(i));
             if (cmp != 0) {

+ 68 - 1
server/src/test/java/org/elasticsearch/search/aggregations/bucket/composite/SingleDimensionValuesSourceTests.java

@@ -40,9 +40,12 @@ public class SingleDimensionValuesSourceTests extends ESTestCase {
         MappedFieldType keyword = new KeywordFieldMapper.KeywordFieldType();
         keyword.setName("keyword");
         BinaryValuesSource source = new BinaryValuesSource(
+            BigArrays.NON_RECYCLING_INSTANCE,
+            (b) -> {},
             keyword,
             context -> null,
             DocValueFormat.RAW,
+            false,
             null,
             1,
             1
@@ -55,9 +58,12 @@ public class SingleDimensionValuesSourceTests extends ESTestCase {
             new TermQuery(new Term("keyword", "toto)"))));
 
         source = new BinaryValuesSource(
+            BigArrays.NON_RECYCLING_INSTANCE,
+            (b) -> {},
             keyword,
             context -> null,
             DocValueFormat.RAW,
+            false,
             "missing_value",
             1,
             1
@@ -66,9 +72,26 @@ public class SingleDimensionValuesSourceTests extends ESTestCase {
         assertNull(source.createSortedDocsProducerOrNull(reader, null));
 
         source = new BinaryValuesSource(
+            BigArrays.NON_RECYCLING_INSTANCE,
+            (b) -> {},
             keyword,
             context -> null,
             DocValueFormat.RAW,
+            true,
+            null,
+            1,
+            1
+        );
+        assertNull(source.createSortedDocsProducerOrNull(reader, new MatchAllDocsQuery()));
+        assertNull(source.createSortedDocsProducerOrNull(reader, null));
+
+        source = new BinaryValuesSource(
+            BigArrays.NON_RECYCLING_INSTANCE,
+            (b) -> {},
+            keyword,
+            context -> null,
+            DocValueFormat.RAW,
+            false,
             null,
             0,
             -1
@@ -77,7 +100,16 @@ public class SingleDimensionValuesSourceTests extends ESTestCase {
 
         MappedFieldType ip = new IpFieldMapper.IpFieldType();
         ip.setName("ip");
-        source = new BinaryValuesSource(ip, context -> null, DocValueFormat.RAW,null, 1, 1);
+        source = new BinaryValuesSource(
+            BigArrays.NON_RECYCLING_INSTANCE,
+            (b) -> {},
+            ip,
+            context -> null,
+            DocValueFormat.RAW,
+            false,
+            null,
+            1,
+            1);
         assertNull(source.createSortedDocsProducerOrNull(reader, null));
     }
 
@@ -88,6 +120,7 @@ public class SingleDimensionValuesSourceTests extends ESTestCase {
             BigArrays.NON_RECYCLING_INSTANCE,
             keyword, context -> null,
             DocValueFormat.RAW,
+            false,
             null,
             1,
             1
@@ -104,6 +137,7 @@ public class SingleDimensionValuesSourceTests extends ESTestCase {
             keyword,
             context -> null,
             DocValueFormat.RAW,
+            false,
             "missing_value",
             1,
             1
@@ -116,6 +150,20 @@ public class SingleDimensionValuesSourceTests extends ESTestCase {
             keyword,
             context -> null,
             DocValueFormat.RAW,
+            true,
+            null,
+            1,
+            1
+        );
+        assertNull(source.createSortedDocsProducerOrNull(reader, new MatchAllDocsQuery()));
+        assertNull(source.createSortedDocsProducerOrNull(reader, null));
+
+        source = new GlobalOrdinalValuesSource(
+            BigArrays.NON_RECYCLING_INSTANCE,
+            keyword,
+            context -> null,
+            DocValueFormat.RAW,
+            false,
             null,
             1,
             -1
@@ -129,6 +177,7 @@ public class SingleDimensionValuesSourceTests extends ESTestCase {
             ip,
             context -> null,
             DocValueFormat.RAW,
+            false,
             null,
             1,
             1
@@ -152,6 +201,7 @@ public class SingleDimensionValuesSourceTests extends ESTestCase {
                     context -> null,
                     value -> value,
                     DocValueFormat.RAW,
+                    false,
                     null,
                     1,
                     1
@@ -169,6 +219,7 @@ public class SingleDimensionValuesSourceTests extends ESTestCase {
                     context -> null,
                     value -> value,
                     DocValueFormat.RAW,
+                    false,
                     0d,
                     1,
                     1);
@@ -176,12 +227,27 @@ public class SingleDimensionValuesSourceTests extends ESTestCase {
                 assertNull(sourceWithMissing.createSortedDocsProducerOrNull(reader, null));
                 assertNull(sourceWithMissing.createSortedDocsProducerOrNull(reader, new TermQuery(new Term("keyword", "toto)"))));
 
+                sourceWithMissing = new LongValuesSource(
+                    BigArrays.NON_RECYCLING_INSTANCE,
+                    number,
+                    context -> null,
+                    value -> value,
+                    DocValueFormat.RAW,
+                    true,
+                    null,
+                    1,
+                    1);
+                assertNull(sourceWithMissing.createSortedDocsProducerOrNull(reader, new MatchAllDocsQuery()));
+                assertNull(sourceWithMissing.createSortedDocsProducerOrNull(reader, null));
+                assertNull(sourceWithMissing.createSortedDocsProducerOrNull(reader, new TermQuery(new Term("keyword", "toto)"))));
+
                 LongValuesSource sourceRev = new LongValuesSource(
                     BigArrays.NON_RECYCLING_INSTANCE,
                     number,
                     context -> null,
                     value -> value,
                     DocValueFormat.RAW,
+                    false,
                     null,
                     1,
                     -1
@@ -195,6 +261,7 @@ public class SingleDimensionValuesSourceTests extends ESTestCase {
                     number,
                     context -> null,
                     DocValueFormat.RAW,
+                    false,
                     null,
                     1,
                     1