|
@@ -9,7 +9,6 @@
|
|
|
package org.elasticsearch.search.aggregations.bucket;
|
|
|
|
|
|
import org.apache.lucene.index.LeafReaderContext;
|
|
|
-import org.elasticsearch.common.breaker.CircuitBreaker;
|
|
|
import org.elasticsearch.common.util.IntArray;
|
|
|
import org.elasticsearch.common.util.LongArray;
|
|
|
import org.elasticsearch.common.util.ObjectArray;
|
|
@@ -42,10 +41,9 @@ import java.util.function.LongUnaryOperator;
|
|
|
import java.util.function.ToLongFunction;
|
|
|
|
|
|
public abstract class BucketsAggregator extends AggregatorBase {
|
|
|
- private final CircuitBreaker breaker;
|
|
|
+
|
|
|
private LongArray docCounts;
|
|
|
protected final DocCountProvider docCountProvider;
|
|
|
- private int callCount;
|
|
|
|
|
|
@SuppressWarnings("this-escape")
|
|
|
public BucketsAggregator(
|
|
@@ -57,7 +55,6 @@ public abstract class BucketsAggregator extends AggregatorBase {
|
|
|
Map<String, Object> metadata
|
|
|
) throws IOException {
|
|
|
super(name, factories, aggCtx, parent, bucketCardinality, metadata);
|
|
|
- breaker = aggCtx.breaker();
|
|
|
docCounts = bigArrays().newLongArray(1, true);
|
|
|
docCountProvider = new DocCountProvider();
|
|
|
}
|
|
@@ -83,7 +80,7 @@ public abstract class BucketsAggregator extends AggregatorBase {
|
|
|
grow(bucketOrd + 1);
|
|
|
int docCount = docCountProvider.getDocCount(doc);
|
|
|
if (docCounts.increment(bucketOrd, docCount) == docCount) {
|
|
|
- updateCircuitBreaker("allocated_buckets");
|
|
|
+ checkRealMemoryCB("allocated_buckets");
|
|
|
}
|
|
|
subCollector.collect(doc, bucketOrd);
|
|
|
}
|
|
@@ -176,7 +173,7 @@ public abstract class BucketsAggregator extends AggregatorBase {
|
|
|
prepareSubAggs(bucketOrdsToCollect);
|
|
|
InternalAggregation[][] aggregations = new InternalAggregation[subAggregators.length][];
|
|
|
for (int i = 0; i < subAggregators.length; i++) {
|
|
|
- updateCircuitBreaker("building_sub_aggregation");
|
|
|
+ checkRealMemoryCB("building_sub_aggregation");
|
|
|
aggregations[i] = subAggregators[i].buildAggregations(bucketOrdsToCollect);
|
|
|
}
|
|
|
return subAggsForBucketFunction(aggregations);
|
|
@@ -247,31 +244,30 @@ public abstract class BucketsAggregator extends AggregatorBase {
|
|
|
Function<List<B>, InternalAggregation> resultBuilder
|
|
|
) throws IOException {
|
|
|
try (LongArray bucketOrdsToCollect = bigArrays().newLongArray(owningBucketOrds.size() * bucketsPerOwningBucketOrd)) {
|
|
|
- int bucketOrdIdx = 0;
|
|
|
+ final int[] bucketOrdIdx = new int[] { 0 };
|
|
|
for (long i = 0; i < owningBucketOrds.size(); i++) {
|
|
|
long ord = owningBucketOrds.get(i) * bucketsPerOwningBucketOrd;
|
|
|
for (int offsetInOwningOrd = 0; offsetInOwningOrd < bucketsPerOwningBucketOrd; offsetInOwningOrd++) {
|
|
|
- bucketOrdsToCollect.set(bucketOrdIdx++, ord++);
|
|
|
+ bucketOrdsToCollect.set(bucketOrdIdx[0]++, ord++);
|
|
|
}
|
|
|
}
|
|
|
- bucketOrdIdx = 0;
|
|
|
+ bucketOrdIdx[0] = 0;
|
|
|
var subAggregationResults = buildSubAggsForBuckets(bucketOrdsToCollect);
|
|
|
|
|
|
- InternalAggregation[] results = new InternalAggregation[Math.toIntExact(owningBucketOrds.size())];
|
|
|
- for (int owningOrdIdx = 0; owningOrdIdx < results.length; owningOrdIdx++) {
|
|
|
+ return buildAggregations(Math.toIntExact(owningBucketOrds.size()), ordIdx -> {
|
|
|
List<B> buckets = new ArrayList<>(bucketsPerOwningBucketOrd);
|
|
|
for (int offsetInOwningOrd = 0; offsetInOwningOrd < bucketsPerOwningBucketOrd; offsetInOwningOrd++) {
|
|
|
+ checkRealMemoryCBForInternalBucket();
|
|
|
buckets.add(
|
|
|
bucketBuilder.build(
|
|
|
offsetInOwningOrd,
|
|
|
- bucketDocCount(bucketOrdsToCollect.get(bucketOrdIdx)),
|
|
|
- subAggregationResults.apply(bucketOrdIdx++)
|
|
|
+ bucketDocCount(bucketOrdsToCollect.get(bucketOrdIdx[0])),
|
|
|
+ subAggregationResults.apply(bucketOrdIdx[0]++)
|
|
|
)
|
|
|
);
|
|
|
}
|
|
|
- results[owningOrdIdx] = resultBuilder.apply(buckets);
|
|
|
- }
|
|
|
- return results;
|
|
|
+ return resultBuilder.apply(buckets);
|
|
|
+ });
|
|
|
}
|
|
|
}
|
|
|
|
|
@@ -295,11 +291,10 @@ public abstract class BucketsAggregator extends AggregatorBase {
|
|
|
* here but we don't because single bucket aggs never have.
|
|
|
*/
|
|
|
var subAggregationResults = buildSubAggsForBuckets(owningBucketOrds);
|
|
|
- InternalAggregation[] results = new InternalAggregation[Math.toIntExact(owningBucketOrds.size())];
|
|
|
- for (int ordIdx = 0; ordIdx < results.length; ordIdx++) {
|
|
|
- results[ordIdx] = resultBuilder.build(owningBucketOrds.get(ordIdx), subAggregationResults.apply(ordIdx));
|
|
|
- }
|
|
|
- return results;
|
|
|
+ return buildAggregations(
|
|
|
+ Math.toIntExact(owningBucketOrds.size()),
|
|
|
+ ordIdx -> resultBuilder.build(owningBucketOrds.get(ordIdx), subAggregationResults.apply(ordIdx))
|
|
|
+ );
|
|
|
}
|
|
|
|
|
|
@FunctionalInterface
|
|
@@ -335,37 +330,36 @@ public abstract class BucketsAggregator extends AggregatorBase {
|
|
|
);
|
|
|
}
|
|
|
try (LongArray bucketOrdsToCollect = bigArrays().newLongArray(totalOrdsToCollect)) {
|
|
|
- int b = 0;
|
|
|
+ final int[] b = new int[] { 0 };
|
|
|
for (long i = 0; i < owningBucketOrds.size(); i++) {
|
|
|
LongKeyedBucketOrds.BucketOrdsEnum ordsEnum = bucketOrds.ordsEnum(owningBucketOrds.get(i));
|
|
|
while (ordsEnum.next()) {
|
|
|
- bucketOrdsToCollect.set(b++, ordsEnum.ord());
|
|
|
+ bucketOrdsToCollect.set(b[0]++, ordsEnum.ord());
|
|
|
}
|
|
|
}
|
|
|
var subAggregationResults = buildSubAggsForBuckets(bucketOrdsToCollect);
|
|
|
|
|
|
- InternalAggregation[] results = new InternalAggregation[Math.toIntExact(owningBucketOrds.size())];
|
|
|
- b = 0;
|
|
|
- for (int ordIdx = 0; ordIdx < results.length; ordIdx++) {
|
|
|
+ b[0] = 0;
|
|
|
+ return buildAggregations(Math.toIntExact(owningBucketOrds.size()), ordIdx -> {
|
|
|
final long owningBucketOrd = owningBucketOrds.get(ordIdx);
|
|
|
List<B> buckets = new ArrayList<>(bucketsInOrd.get(ordIdx));
|
|
|
LongKeyedBucketOrds.BucketOrdsEnum ordsEnum = bucketOrds.ordsEnum(owningBucketOrd);
|
|
|
while (ordsEnum.next()) {
|
|
|
- if (bucketOrdsToCollect.get(b) != ordsEnum.ord()) {
|
|
|
+ if (bucketOrdsToCollect.get(b[0]) != ordsEnum.ord()) {
|
|
|
// If we hit this, something has gone horribly wrong and we need to investigate
|
|
|
throw AggregationErrors.iterationOrderChangedWithoutMutating(
|
|
|
bucketOrds.toString(),
|
|
|
ordsEnum.ord(),
|
|
|
- bucketOrdsToCollect.get(b)
|
|
|
+ bucketOrdsToCollect.get(b[0])
|
|
|
);
|
|
|
}
|
|
|
+ checkRealMemoryCBForInternalBucket();
|
|
|
buckets.add(
|
|
|
- bucketBuilder.build(ordsEnum.value(), bucketDocCount(ordsEnum.ord()), subAggregationResults.apply(b++))
|
|
|
+ bucketBuilder.build(ordsEnum.value(), bucketDocCount(ordsEnum.ord()), subAggregationResults.apply(b[0]++))
|
|
|
);
|
|
|
}
|
|
|
- results[ordIdx] = resultBuilder.build(owningBucketOrd, buckets);
|
|
|
- }
|
|
|
- return results;
|
|
|
+ return resultBuilder.build(owningBucketOrd, buckets);
|
|
|
+ });
|
|
|
}
|
|
|
}
|
|
|
}
|
|
@@ -425,14 +419,9 @@ public abstract class BucketsAggregator extends AggregatorBase {
|
|
|
docCountProvider.setLeafReaderContext(ctx);
|
|
|
}
|
|
|
|
|
|
- /**
|
|
|
- * This method calls the circuit breaker from time to time in order to give it a chance to check available
|
|
|
- * memory in the parent breaker (Which should be a real memory breaker) and break the execution if we are running out.
|
|
|
- * To achieve that, we are passing 0 as the estimated bytes every 1024 calls
|
|
|
- */
|
|
|
- private void updateCircuitBreaker(String label) {
|
|
|
- if ((++callCount & 0x3FF) == 0) {
|
|
|
- breaker.addEstimateBytesAndMaybeBreak(0, label);
|
|
|
- }
|
|
|
+ /** This method should be called whenever a new bucket object is created. It will check the real memory
|
|
|
+ * circuit breaker in a sampling fashion. See {@link #checkRealMemoryCB(String)} */
|
|
|
+ protected final void checkRealMemoryCBForInternalBucket() {
|
|
|
+ checkRealMemoryCB("internal_bucket");
|
|
|
}
|
|
|
}
|