|
@@ -46,8 +46,8 @@ class WeightedAvgAggregator extends NumericMetricsAggregator.SingleValue {
|
|
|
private final MultiValuesSource.NumericMultiValuesSource valuesSources;
|
|
|
|
|
|
private DoubleArray weights;
|
|
|
- private DoubleArray sums;
|
|
|
- private DoubleArray sumCompensations;
|
|
|
+ private DoubleArray valueSums;
|
|
|
+ private DoubleArray valueCompensations;
|
|
|
private DoubleArray weightCompensations;
|
|
|
private DocValueFormat format;
|
|
|
|
|
@@ -60,8 +60,8 @@ class WeightedAvgAggregator extends NumericMetricsAggregator.SingleValue {
|
|
|
if (valuesSources != null) {
|
|
|
final BigArrays bigArrays = context.bigArrays();
|
|
|
weights = bigArrays.newDoubleArray(1, true);
|
|
|
- sums = bigArrays.newDoubleArray(1, true);
|
|
|
- sumCompensations = bigArrays.newDoubleArray(1, true);
|
|
|
+ valueSums = bigArrays.newDoubleArray(1, true);
|
|
|
+ valueCompensations = bigArrays.newDoubleArray(1, true);
|
|
|
weightCompensations = bigArrays.newDoubleArray(1, true);
|
|
|
}
|
|
|
}
|
|
@@ -80,13 +80,15 @@ class WeightedAvgAggregator extends NumericMetricsAggregator.SingleValue {
|
|
|
final BigArrays bigArrays = context.bigArrays();
|
|
|
final SortedNumericDoubleValues docValues = valuesSources.getField(VALUE_FIELD.getPreferredName(), ctx);
|
|
|
final SortedNumericDoubleValues docWeights = valuesSources.getField(WEIGHT_FIELD.getPreferredName(), ctx);
|
|
|
+ final CompensatedSum compensatedValueSum = new CompensatedSum(0, 0);
|
|
|
+ final CompensatedSum compensatedWeightSum = new CompensatedSum(0, 0);
|
|
|
|
|
|
return new LeafBucketCollectorBase(sub, docValues) {
|
|
|
@Override
|
|
|
public void collect(int doc, long bucket) throws IOException {
|
|
|
weights = bigArrays.grow(weights, bucket + 1);
|
|
|
- sums = bigArrays.grow(sums, bucket + 1);
|
|
|
- sumCompensations = bigArrays.grow(sumCompensations, bucket + 1);
|
|
|
+ valueSums = bigArrays.grow(valueSums, bucket + 1);
|
|
|
+ valueCompensations = bigArrays.grow(valueCompensations, bucket + 1);
|
|
|
weightCompensations = bigArrays.grow(weightCompensations, bucket + 1);
|
|
|
|
|
|
if (docValues.advanceExact(doc) && docWeights.advanceExact(doc)) {
|
|
@@ -102,42 +104,43 @@ class WeightedAvgAggregator extends NumericMetricsAggregator.SingleValue {
|
|
|
final int numValues = docValues.docValueCount();
|
|
|
assert numValues > 0;
|
|
|
|
|
|
+ double valueSum = valueSums.get(bucket);
|
|
|
+ double valueCompensation = valueCompensations.get(bucket);
|
|
|
+ compensatedValueSum.reset(valueSum, valueCompensation);
|
|
|
+
|
|
|
+ double weightSum = weights.get(bucket);
|
|
|
+ double weightCompensation = weightCompensations.get(bucket);
|
|
|
+ compensatedWeightSum.reset(weightSum, weightCompensation);
|
|
|
+
|
|
|
for (int i = 0; i < numValues; i++) {
|
|
|
- kahanSum(docValues.nextValue() * weight, sums, sumCompensations, bucket);
|
|
|
- kahanSum(weight, weights, weightCompensations, bucket);
|
|
|
+ compensatedValueSum.add(docValues.nextValue() * weight);
|
|
|
+ compensatedWeightSum.add(weight);
|
|
|
}
|
|
|
+
|
|
|
+ valueSums.set(bucket, compensatedValueSum.value());
|
|
|
+ valueCompensations.set(bucket, compensatedValueSum.delta());
|
|
|
+ weights.set(bucket, compensatedWeightSum.value());
|
|
|
+ weightCompensations.set(bucket, compensatedWeightSum.delta());
|
|
|
}
|
|
|
}
|
|
|
};
|
|
|
}
|
|
|
|
|
|
- private static void kahanSum(double value, DoubleArray values, DoubleArray compensations, long bucket) {
|
|
|
- // Compute the sum of double values with Kahan summation algorithm which is more
|
|
|
- // accurate than naive summation.
|
|
|
- double sum = values.get(bucket);
|
|
|
- double compensation = compensations.get(bucket);
|
|
|
-
|
|
|
- CompensatedSum kahanSummation = new CompensatedSum(sum, compensation)
|
|
|
- .add(value);
|
|
|
-
|
|
|
- values.set(bucket, kahanSummation.value());
|
|
|
- compensations.set(bucket, kahanSummation.delta());
|
|
|
- }
|
|
|
|
|
|
@Override
|
|
|
public double metric(long owningBucketOrd) {
|
|
|
- if (valuesSources == null || owningBucketOrd >= sums.size()) {
|
|
|
+ if (valuesSources == null || owningBucketOrd >= valueSums.size()) {
|
|
|
return Double.NaN;
|
|
|
}
|
|
|
- return sums.get(owningBucketOrd) / weights.get(owningBucketOrd);
|
|
|
+ return valueSums.get(owningBucketOrd) / weights.get(owningBucketOrd);
|
|
|
}
|
|
|
|
|
|
@Override
|
|
|
public InternalAggregation buildAggregation(long bucket) {
|
|
|
- if (valuesSources == null || bucket >= sums.size()) {
|
|
|
+ if (valuesSources == null || bucket >= valueSums.size()) {
|
|
|
return buildEmptyAggregation();
|
|
|
}
|
|
|
- return new InternalWeightedAvg(name, sums.get(bucket), weights.get(bucket), format, pipelineAggregators(), metaData());
|
|
|
+ return new InternalWeightedAvg(name, valueSums.get(bucket), weights.get(bucket), format, pipelineAggregators(), metaData());
|
|
|
}
|
|
|
|
|
|
@Override
|
|
@@ -147,7 +150,7 @@ class WeightedAvgAggregator extends NumericMetricsAggregator.SingleValue {
|
|
|
|
|
|
@Override
|
|
|
public void doClose() {
|
|
|
- Releasables.close(weights, sums, sumCompensations, weightCompensations);
|
|
|
+ Releasables.close(weights, valueSums, valueCompensations, weightCompensations);
|
|
|
}
|
|
|
|
|
|
}
|