|
@@ -10,7 +10,9 @@ package org.elasticsearch.search.aggregations.bucket;
|
|
|
|
|
|
import org.apache.lucene.index.LeafReaderContext;
|
|
|
import org.elasticsearch.common.breaker.CircuitBreaker;
|
|
|
+import org.elasticsearch.common.util.IntArray;
|
|
|
import org.elasticsearch.common.util.LongArray;
|
|
|
+import org.elasticsearch.common.util.ObjectArray;
|
|
|
import org.elasticsearch.core.Releasable;
|
|
|
import org.elasticsearch.search.aggregations.AggregationErrors;
|
|
|
import org.elasticsearch.search.aggregations.Aggregator;
|
|
@@ -155,22 +157,22 @@ public abstract class BucketsAggregator extends AggregatorBase {
|
|
|
/**
|
|
|
* Hook to allow taking an action before building the sub agg results.
|
|
|
*/
|
|
|
- protected void prepareSubAggs(long[] ordsToCollect) throws IOException {}
|
|
|
+ protected void prepareSubAggs(LongArray ordsToCollect) throws IOException {}
|
|
|
|
|
|
/**
|
|
|
* Build the results of the sub-aggregations of the buckets at each of
|
|
|
* the provided ordinals.
|
|
|
* <p>
|
|
|
* Most aggregations should probably use something like
|
|
|
- * {@link #buildSubAggsForAllBuckets(Object[][], ToLongFunction, BiConsumer)}
|
|
|
- * or {@link #buildAggregationsForVariableBuckets(long[], LongKeyedBucketOrds, BucketBuilderForVariable, ResultBuilderForVariable)}
|
|
|
- * or {@link #buildAggregationsForFixedBucketCount(long[], int, BucketBuilderForFixedCount, Function)}
|
|
|
- * or {@link #buildAggregationsForSingleBucket(long[], SingleBucketResultBuilder)}
|
|
|
+ * {@link #buildSubAggsForAllBuckets(ObjectArray, ToLongFunction, BiConsumer)}
|
|
|
+ * or {@link #buildAggregationsForVariableBuckets(LongArray, LongKeyedBucketOrds, BucketBuilderForVariable, ResultBuilderForVariable)}
|
|
|
+ * or {@link #buildAggregationsForFixedBucketCount(LongArray, int, BucketBuilderForFixedCount, Function)}
|
|
|
+ * or {@link #buildAggregationsForSingleBucket(LongArray, SingleBucketResultBuilder)}
|
|
|
* instead of calling this directly.
|
|
|
* @return the sub-aggregation results in the same order as the provided
|
|
|
* array of ordinals
|
|
|
*/
|
|
|
- protected final IntFunction<InternalAggregations> buildSubAggsForBuckets(long[] bucketOrdsToCollect) throws IOException {
|
|
|
+ protected final IntFunction<InternalAggregations> buildSubAggsForBuckets(LongArray bucketOrdsToCollect) throws IOException {
|
|
|
prepareSubAggs(bucketOrdsToCollect);
|
|
|
InternalAggregation[][] aggregations = new InternalAggregation[subAggregators.length][];
|
|
|
for (int i = 0; i < subAggregators.length; i++) {
|
|
@@ -204,26 +206,28 @@ public abstract class BucketsAggregator extends AggregatorBase {
|
|
|
* @param setAggs how to set the sub-aggregation results on a bucket
|
|
|
*/
|
|
|
protected final <B> void buildSubAggsForAllBuckets(
|
|
|
- B[][] buckets,
|
|
|
+ ObjectArray<B[]> buckets,
|
|
|
ToLongFunction<B> bucketToOrd,
|
|
|
BiConsumer<B, InternalAggregations> setAggs
|
|
|
) throws IOException {
|
|
|
- int totalBucketOrdsToCollect = 0;
|
|
|
- for (B[] bucketsForOneResult : buckets) {
|
|
|
- totalBucketOrdsToCollect += bucketsForOneResult.length;
|
|
|
+ long totalBucketOrdsToCollect = 0;
|
|
|
+ for (long b = 0; b < buckets.size(); b++) {
|
|
|
+ totalBucketOrdsToCollect += buckets.get(b).length;
|
|
|
}
|
|
|
- long[] bucketOrdsToCollect = new long[totalBucketOrdsToCollect];
|
|
|
- int s = 0;
|
|
|
- for (B[] bucketsForOneResult : buckets) {
|
|
|
- for (B bucket : bucketsForOneResult) {
|
|
|
- bucketOrdsToCollect[s++] = bucketToOrd.applyAsLong(bucket);
|
|
|
+
|
|
|
+ try (LongArray bucketOrdsToCollect = bigArrays().newLongArray(totalBucketOrdsToCollect)) {
|
|
|
+ int s = 0;
|
|
|
+ for (long ord = 0; ord < buckets.size(); ord++) {
|
|
|
+ for (B bucket : buckets.get(ord)) {
|
|
|
+ bucketOrdsToCollect.set(s++, bucketToOrd.applyAsLong(bucket));
|
|
|
+ }
|
|
|
}
|
|
|
- }
|
|
|
- var results = buildSubAggsForBuckets(bucketOrdsToCollect);
|
|
|
- s = 0;
|
|
|
- for (B[] bucket : buckets) {
|
|
|
- for (int b = 0; b < bucket.length; b++) {
|
|
|
- setAggs.accept(bucket[b], results.apply(s++));
|
|
|
+ var results = buildSubAggsForBuckets(bucketOrdsToCollect);
|
|
|
+ s = 0;
|
|
|
+ for (long ord = 0; ord < buckets.size(); ord++) {
|
|
|
+ for (B value : buckets.get(ord)) {
|
|
|
+ setAggs.accept(value, results.apply(s++));
|
|
|
+ }
|
|
|
}
|
|
|
}
|
|
|
}
|
|
@@ -237,37 +241,38 @@ public abstract class BucketsAggregator extends AggregatorBase {
|
|
|
* @param resultBuilder how to build a result from buckets
|
|
|
*/
|
|
|
protected final <B> InternalAggregation[] buildAggregationsForFixedBucketCount(
|
|
|
- long[] owningBucketOrds,
|
|
|
+ LongArray owningBucketOrds,
|
|
|
int bucketsPerOwningBucketOrd,
|
|
|
BucketBuilderForFixedCount<B> bucketBuilder,
|
|
|
Function<List<B>, InternalAggregation> resultBuilder
|
|
|
) throws IOException {
|
|
|
- int totalBuckets = owningBucketOrds.length * bucketsPerOwningBucketOrd;
|
|
|
- long[] bucketOrdsToCollect = new long[totalBuckets];
|
|
|
- int bucketOrdIdx = 0;
|
|
|
- for (long owningBucketOrd : owningBucketOrds) {
|
|
|
- long ord = owningBucketOrd * bucketsPerOwningBucketOrd;
|
|
|
- for (int offsetInOwningOrd = 0; offsetInOwningOrd < bucketsPerOwningBucketOrd; offsetInOwningOrd++) {
|
|
|
- bucketOrdsToCollect[bucketOrdIdx++] = ord++;
|
|
|
+ try (LongArray bucketOrdsToCollect = bigArrays().newLongArray(owningBucketOrds.size() * bucketsPerOwningBucketOrd)) {
|
|
|
+ int bucketOrdIdx = 0;
|
|
|
+ for (long i = 0; i < owningBucketOrds.size(); i++) {
|
|
|
+ long ord = owningBucketOrds.get(i) * bucketsPerOwningBucketOrd;
|
|
|
+ for (int offsetInOwningOrd = 0; offsetInOwningOrd < bucketsPerOwningBucketOrd; offsetInOwningOrd++) {
|
|
|
+ bucketOrdsToCollect.set(bucketOrdIdx++, ord++);
|
|
|
+ }
|
|
|
}
|
|
|
- }
|
|
|
- bucketOrdIdx = 0;
|
|
|
- var subAggregationResults = buildSubAggsForBuckets(bucketOrdsToCollect);
|
|
|
- InternalAggregation[] results = new InternalAggregation[owningBucketOrds.length];
|
|
|
- for (int owningOrdIdx = 0; owningOrdIdx < owningBucketOrds.length; owningOrdIdx++) {
|
|
|
- List<B> buckets = new ArrayList<>(bucketsPerOwningBucketOrd);
|
|
|
- for (int offsetInOwningOrd = 0; offsetInOwningOrd < bucketsPerOwningBucketOrd; offsetInOwningOrd++) {
|
|
|
- buckets.add(
|
|
|
- bucketBuilder.build(
|
|
|
- offsetInOwningOrd,
|
|
|
- bucketDocCount(bucketOrdsToCollect[bucketOrdIdx]),
|
|
|
- subAggregationResults.apply(bucketOrdIdx++)
|
|
|
- )
|
|
|
- );
|
|
|
+ bucketOrdIdx = 0;
|
|
|
+ var subAggregationResults = buildSubAggsForBuckets(bucketOrdsToCollect);
|
|
|
+
|
|
|
+ InternalAggregation[] results = new InternalAggregation[Math.toIntExact(owningBucketOrds.size())];
|
|
|
+ for (int owningOrdIdx = 0; owningOrdIdx < results.length; owningOrdIdx++) {
|
|
|
+ List<B> buckets = new ArrayList<>(bucketsPerOwningBucketOrd);
|
|
|
+ for (int offsetInOwningOrd = 0; offsetInOwningOrd < bucketsPerOwningBucketOrd; offsetInOwningOrd++) {
|
|
|
+ buckets.add(
|
|
|
+ bucketBuilder.build(
|
|
|
+ offsetInOwningOrd,
|
|
|
+ bucketDocCount(bucketOrdsToCollect.get(bucketOrdIdx)),
|
|
|
+ subAggregationResults.apply(bucketOrdIdx++)
|
|
|
+ )
|
|
|
+ );
|
|
|
+ }
|
|
|
+ results[owningOrdIdx] = resultBuilder.apply(buckets);
|
|
|
}
|
|
|
- results[owningOrdIdx] = resultBuilder.apply(buckets);
|
|
|
+ return results;
|
|
|
}
|
|
|
- return results;
|
|
|
}
|
|
|
|
|
|
@FunctionalInterface
|
|
@@ -280,17 +285,19 @@ public abstract class BucketsAggregator extends AggregatorBase {
|
|
|
* @param owningBucketOrds owning bucket ordinals for which to build the results
|
|
|
* @param resultBuilder how to build a result from the sub aggregation results
|
|
|
*/
|
|
|
- protected final InternalAggregation[] buildAggregationsForSingleBucket(long[] owningBucketOrds, SingleBucketResultBuilder resultBuilder)
|
|
|
- throws IOException {
|
|
|
+ protected final InternalAggregation[] buildAggregationsForSingleBucket(
|
|
|
+ LongArray owningBucketOrds,
|
|
|
+ SingleBucketResultBuilder resultBuilder
|
|
|
+ ) throws IOException {
|
|
|
/*
|
|
|
* It'd be entirely reasonable to call
|
|
|
* `consumeBucketsAndMaybeBreak(owningBucketOrds.length)`
|
|
|
* here but we don't because single bucket aggs never have.
|
|
|
*/
|
|
|
var subAggregationResults = buildSubAggsForBuckets(owningBucketOrds);
|
|
|
- InternalAggregation[] results = new InternalAggregation[owningBucketOrds.length];
|
|
|
- for (int ordIdx = 0; ordIdx < owningBucketOrds.length; ordIdx++) {
|
|
|
- results[ordIdx] = resultBuilder.build(owningBucketOrds[ordIdx], subAggregationResults.apply(ordIdx));
|
|
|
+ InternalAggregation[] results = new InternalAggregation[Math.toIntExact(owningBucketOrds.size())];
|
|
|
+ for (int ordIdx = 0; ordIdx < results.length; ordIdx++) {
|
|
|
+ results[ordIdx] = resultBuilder.build(owningBucketOrds.get(ordIdx), subAggregationResults.apply(ordIdx));
|
|
|
}
|
|
|
return results;
|
|
|
}
|
|
@@ -307,54 +314,60 @@ public abstract class BucketsAggregator extends AggregatorBase {
|
|
|
* @param bucketOrds hash of values to the bucket ordinal
|
|
|
*/
|
|
|
protected final <B> InternalAggregation[] buildAggregationsForVariableBuckets(
|
|
|
- long[] owningBucketOrds,
|
|
|
+ LongArray owningBucketOrds,
|
|
|
LongKeyedBucketOrds bucketOrds,
|
|
|
BucketBuilderForVariable<B> bucketBuilder,
|
|
|
ResultBuilderForVariable<B> resultBuilder
|
|
|
) throws IOException {
|
|
|
long totalOrdsToCollect = 0;
|
|
|
- final int[] bucketsInOrd = new int[owningBucketOrds.length];
|
|
|
- for (int ordIdx = 0; ordIdx < owningBucketOrds.length; ordIdx++) {
|
|
|
- final long bucketCount = bucketOrds.bucketsInOrd(owningBucketOrds[ordIdx]);
|
|
|
- bucketsInOrd[ordIdx] = (int) bucketCount;
|
|
|
- totalOrdsToCollect += bucketCount;
|
|
|
- }
|
|
|
- if (totalOrdsToCollect > Integer.MAX_VALUE) {
|
|
|
- // TODO: We should instrument this error. While it is correct for it to be a 400 class IllegalArgumentException, there is not
|
|
|
- // much the user can do about that. If this occurs with any frequency, we should do something about it.
|
|
|
- throw new IllegalArgumentException(
|
|
|
- "Can't collect more than [" + Integer.MAX_VALUE + "] buckets but attempted [" + totalOrdsToCollect + "]"
|
|
|
- );
|
|
|
- }
|
|
|
- long[] bucketOrdsToCollect = new long[(int) totalOrdsToCollect];
|
|
|
- int b = 0;
|
|
|
- for (long owningBucketOrd : owningBucketOrds) {
|
|
|
- LongKeyedBucketOrds.BucketOrdsEnum ordsEnum = bucketOrds.ordsEnum(owningBucketOrd);
|
|
|
- while (ordsEnum.next()) {
|
|
|
- bucketOrdsToCollect[b++] = ordsEnum.ord();
|
|
|
+ try (IntArray bucketsInOrd = bigArrays().newIntArray(owningBucketOrds.size())) {
|
|
|
+ for (long ordIdx = 0; ordIdx < owningBucketOrds.size(); ordIdx++) {
|
|
|
+ final long bucketCount = bucketOrds.bucketsInOrd(owningBucketOrds.get(ordIdx));
|
|
|
+ bucketsInOrd.set(ordIdx, (int) bucketCount);
|
|
|
+ totalOrdsToCollect += bucketCount;
|
|
|
}
|
|
|
- }
|
|
|
- var subAggregationResults = buildSubAggsForBuckets(bucketOrdsToCollect);
|
|
|
-
|
|
|
- InternalAggregation[] results = new InternalAggregation[owningBucketOrds.length];
|
|
|
- b = 0;
|
|
|
- for (int ordIdx = 0; ordIdx < owningBucketOrds.length; ordIdx++) {
|
|
|
- List<B> buckets = new ArrayList<>(bucketsInOrd[ordIdx]);
|
|
|
- LongKeyedBucketOrds.BucketOrdsEnum ordsEnum = bucketOrds.ordsEnum(owningBucketOrds[ordIdx]);
|
|
|
- while (ordsEnum.next()) {
|
|
|
- if (bucketOrdsToCollect[b] != ordsEnum.ord()) {
|
|
|
- // If we hit this, something has gone horribly wrong and we need to investigate
|
|
|
- throw AggregationErrors.iterationOrderChangedWithoutMutating(
|
|
|
- bucketOrds.toString(),
|
|
|
- ordsEnum.ord(),
|
|
|
- bucketOrdsToCollect[b]
|
|
|
- );
|
|
|
+ if (totalOrdsToCollect > Integer.MAX_VALUE) {
|
|
|
+ // TODO: We should instrument this error. While it is correct for it to be a 400 class IllegalArgumentException, there is
|
|
|
+ // not
|
|
|
+ // much the user can do about that. If this occurs with any frequency, we should do something about it.
|
|
|
+ throw new IllegalArgumentException(
|
|
|
+ "Can't collect more than [" + Integer.MAX_VALUE + "] buckets but attempted [" + totalOrdsToCollect + "]"
|
|
|
+ );
|
|
|
+ }
|
|
|
+ try (LongArray bucketOrdsToCollect = bigArrays().newLongArray(totalOrdsToCollect)) {
|
|
|
+ int b = 0;
|
|
|
+ for (long i = 0; i < owningBucketOrds.size(); i++) {
|
|
|
+ LongKeyedBucketOrds.BucketOrdsEnum ordsEnum = bucketOrds.ordsEnum(owningBucketOrds.get(i));
|
|
|
+ while (ordsEnum.next()) {
|
|
|
+ bucketOrdsToCollect.set(b++, ordsEnum.ord());
|
|
|
+ }
|
|
|
}
|
|
|
- buckets.add(bucketBuilder.build(ordsEnum.value(), bucketDocCount(ordsEnum.ord()), subAggregationResults.apply(b++)));
|
|
|
+ var subAggregationResults = buildSubAggsForBuckets(bucketOrdsToCollect);
|
|
|
+
|
|
|
+ InternalAggregation[] results = new InternalAggregation[Math.toIntExact(owningBucketOrds.size())];
|
|
|
+ b = 0;
|
|
|
+ for (int ordIdx = 0; ordIdx < results.length; ordIdx++) {
|
|
|
+ final long owningBucketOrd = owningBucketOrds.get(ordIdx);
|
|
|
+ List<B> buckets = new ArrayList<>(bucketsInOrd.get(ordIdx));
|
|
|
+ LongKeyedBucketOrds.BucketOrdsEnum ordsEnum = bucketOrds.ordsEnum(owningBucketOrd);
|
|
|
+ while (ordsEnum.next()) {
|
|
|
+ if (bucketOrdsToCollect.get(b) != ordsEnum.ord()) {
|
|
|
+ // If we hit this, something has gone horribly wrong and we need to investigate
|
|
|
+ throw AggregationErrors.iterationOrderChangedWithoutMutating(
|
|
|
+ bucketOrds.toString(),
|
|
|
+ ordsEnum.ord(),
|
|
|
+ bucketOrdsToCollect.get(b)
|
|
|
+ );
|
|
|
+ }
|
|
|
+ buckets.add(
|
|
|
+ bucketBuilder.build(ordsEnum.value(), bucketDocCount(ordsEnum.ord()), subAggregationResults.apply(b++))
|
|
|
+ );
|
|
|
+ }
|
|
|
+ results[ordIdx] = resultBuilder.build(owningBucketOrd, buckets);
|
|
|
+ }
|
|
|
+ return results;
|
|
|
}
|
|
|
- results[ordIdx] = resultBuilder.build(owningBucketOrds[ordIdx], buckets);
|
|
|
}
|
|
|
- return results;
|
|
|
}
|
|
|
|
|
|
@FunctionalInterface
|