|
@@ -19,7 +19,6 @@ import org.elasticsearch.search.aggregations.AggregatorReducer;
|
|
|
import org.elasticsearch.search.aggregations.InternalAggregation;
|
|
|
import org.elasticsearch.search.aggregations.InternalAggregations;
|
|
|
import org.elasticsearch.search.aggregations.InternalMultiBucketAggregation;
|
|
|
-import org.elasticsearch.search.aggregations.KeyComparable;
|
|
|
import org.elasticsearch.search.aggregations.bucket.BucketReducer;
|
|
|
import org.elasticsearch.search.aggregations.bucket.IteratorAndCurrent;
|
|
|
import org.elasticsearch.search.aggregations.support.SamplingContext;
|
|
@@ -103,7 +102,7 @@ public class InternalComposite extends InternalMultiBucketAggregation<InternalCo
|
|
|
}
|
|
|
this.reverseMuls = in.readIntArray();
|
|
|
this.missingOrders = in.readArray(MissingOrder::readFromStream, MissingOrder[]::new);
|
|
|
- this.buckets = in.readCollectionAsList((input) -> new InternalBucket(input, sourceNames, formats, reverseMuls, missingOrders));
|
|
|
+ this.buckets = in.readCollectionAsList((input) -> new InternalBucket(input, sourceNames, formats));
|
|
|
this.afterKey = in.readOptionalWriteable(CompositeKey::new);
|
|
|
this.earlyTerminated = in.readBoolean();
|
|
|
}
|
|
@@ -155,15 +154,7 @@ public class InternalComposite extends InternalMultiBucketAggregation<InternalCo
|
|
|
|
|
|
@Override
|
|
|
public InternalBucket createBucket(InternalAggregations aggregations, InternalBucket prototype) {
|
|
|
- return new InternalBucket(
|
|
|
- prototype.sourceNames,
|
|
|
- prototype.formats,
|
|
|
- prototype.key,
|
|
|
- prototype.reverseMuls,
|
|
|
- prototype.missingOrders,
|
|
|
- prototype.docCount,
|
|
|
- aggregations
|
|
|
- );
|
|
|
+ return new InternalBucket(prototype.sourceNames, prototype.formats, prototype.key, prototype.docCount, aggregations);
|
|
|
}
|
|
|
|
|
|
public int getSize() {
|
|
@@ -206,7 +197,7 @@ public class InternalComposite extends InternalMultiBucketAggregation<InternalCo
|
|
|
private final PriorityQueue<IteratorAndCurrent<InternalBucket>> pq = new PriorityQueue<>(size) {
|
|
|
@Override
|
|
|
protected boolean lessThan(IteratorAndCurrent<InternalBucket> a, IteratorAndCurrent<InternalBucket> b) {
|
|
|
- return a.current().compareKey(b.current()) < 0;
|
|
|
+ return a.current().compareKey(b.current(), reverseMuls, missingOrders) < 0;
|
|
|
}
|
|
|
};
|
|
|
private boolean earlyTerminated = false;
|
|
@@ -227,7 +218,7 @@ public class InternalComposite extends InternalMultiBucketAggregation<InternalCo
|
|
|
final List<InternalBucket> result = new ArrayList<>();
|
|
|
while (pq.size() > 0) {
|
|
|
IteratorAndCurrent<InternalBucket> top = pq.top();
|
|
|
- if (lastBucket != null && top.current().compareKey(lastBucket) != 0) {
|
|
|
+ if (lastBucket != null && top.current().compareKey(lastBucket, reverseMuls, missingOrders) != 0) {
|
|
|
InternalBucket reduceBucket = reduceBucket(buckets, reduceContext);
|
|
|
buckets.clear();
|
|
|
result.add(reduceBucket);
|
|
@@ -306,7 +297,7 @@ public class InternalComposite extends InternalMultiBucketAggregation<InternalCo
|
|
|
final var reducedFormats = reducer.getProto().formats;
|
|
|
final long docCount = reducer.getDocCount();
|
|
|
final InternalAggregations aggs = reducer.getAggregations();
|
|
|
- return new InternalBucket(sourceNames, reducedFormats, reducer.getProto().key, reverseMuls, missingOrders, docCount, aggs);
|
|
|
+ return new InternalBucket(sourceNames, reducedFormats, reducer.getProto().key, docCount, aggs);
|
|
|
}
|
|
|
}
|
|
|
|
|
@@ -329,16 +320,11 @@ public class InternalComposite extends InternalMultiBucketAggregation<InternalCo
|
|
|
return Objects.hash(super.hashCode(), size, buckets, afterKey, Arrays.hashCode(reverseMuls), Arrays.hashCode(missingOrders));
|
|
|
}
|
|
|
|
|
|
- public static class InternalBucket extends InternalMultiBucketAggregation.InternalBucket
|
|
|
- implements
|
|
|
- CompositeAggregation.Bucket,
|
|
|
- KeyComparable<InternalBucket> {
|
|
|
+ public static class InternalBucket extends InternalMultiBucketAggregation.InternalBucket implements CompositeAggregation.Bucket {
|
|
|
|
|
|
private final CompositeKey key;
|
|
|
private final long docCount;
|
|
|
private final InternalAggregations aggregations;
|
|
|
- private final transient int[] reverseMuls;
|
|
|
- private final transient MissingOrder[] missingOrders;
|
|
|
private final transient List<String> sourceNames;
|
|
|
private final transient List<DocValueFormat> formats;
|
|
|
|
|
@@ -346,32 +332,20 @@ public class InternalComposite extends InternalMultiBucketAggregation<InternalCo
|
|
|
List<String> sourceNames,
|
|
|
List<DocValueFormat> formats,
|
|
|
CompositeKey key,
|
|
|
- int[] reverseMuls,
|
|
|
- MissingOrder[] missingOrders,
|
|
|
long docCount,
|
|
|
InternalAggregations aggregations
|
|
|
) {
|
|
|
this.key = key;
|
|
|
this.docCount = docCount;
|
|
|
this.aggregations = aggregations;
|
|
|
- this.reverseMuls = reverseMuls;
|
|
|
- this.missingOrders = missingOrders;
|
|
|
this.sourceNames = sourceNames;
|
|
|
this.formats = formats;
|
|
|
}
|
|
|
|
|
|
- InternalBucket(
|
|
|
- StreamInput in,
|
|
|
- List<String> sourceNames,
|
|
|
- List<DocValueFormat> formats,
|
|
|
- int[] reverseMuls,
|
|
|
- MissingOrder[] missingOrders
|
|
|
- ) throws IOException {
|
|
|
+ InternalBucket(StreamInput in, List<String> sourceNames, List<DocValueFormat> formats) throws IOException {
|
|
|
this.key = new CompositeKey(in);
|
|
|
this.docCount = in.readVLong();
|
|
|
this.aggregations = InternalAggregations.readFrom(in);
|
|
|
- this.reverseMuls = reverseMuls;
|
|
|
- this.missingOrders = missingOrders;
|
|
|
this.sourceNames = sourceNames;
|
|
|
this.formats = formats;
|
|
|
}
|
|
@@ -444,8 +418,7 @@ public class InternalComposite extends InternalMultiBucketAggregation<InternalCo
|
|
|
return formats;
|
|
|
}
|
|
|
|
|
|
- @Override
|
|
|
- public int compareKey(InternalBucket other) {
|
|
|
+ int compareKey(InternalBucket other, int[] reverseMuls, MissingOrder[] missingOrders) {
|
|
|
for (int i = 0; i < key.size(); i++) {
|
|
|
if (key.get(i) == null) {
|
|
|
if (other.key.get(i) == null) {
|
|
@@ -470,8 +443,6 @@ public class InternalComposite extends InternalMultiBucketAggregation<InternalCo
|
|
|
sourceNames,
|
|
|
formats,
|
|
|
key,
|
|
|
- reverseMuls,
|
|
|
- missingOrders,
|
|
|
samplingContext.scaleUp(docCount),
|
|
|
InternalAggregations.finalizeSampling(aggregations, samplingContext)
|
|
|
);
|