|
@@ -23,6 +23,7 @@ import org.elasticsearch.compute.data.FloatBlock;
|
|
|
import org.elasticsearch.compute.data.IntBlock;
|
|
|
import org.elasticsearch.compute.data.IntVector;
|
|
|
import org.elasticsearch.compute.data.LongBlock;
|
|
|
+import org.elasticsearch.compute.data.LongVector;
|
|
|
import org.elasticsearch.compute.operator.DriverContext;
|
|
|
import org.elasticsearch.core.Releasable;
|
|
|
import org.elasticsearch.core.Releasables;
|
|
@@ -136,7 +137,6 @@ public class DeltaDoubleAggregator {
|
|
|
ensureCapacity(groupId);
|
|
|
append(groupId, timestamps.getLong(firstTs), values.getDouble(firstIndex));
|
|
|
if (valueCount > 1) {
|
|
|
- ensureCapacity(groupId);
|
|
|
append(groupId, timestamps.getLong(firstTs + 1), values.getDouble(firstIndex + 1));
|
|
|
}
|
|
|
// We are merging the state from upstream, which means we have seen
|
|
@@ -161,7 +161,7 @@ public class DeltaDoubleAggregator {
|
|
|
final BlockFactory blockFactory = driverContext.blockFactory();
|
|
|
final int positionCount = selected.getPositionCount();
|
|
|
try (
|
|
|
- LongBlock.Builder samples = blockFactory.newLongBlockBuilder(positionCount);
|
|
|
+ LongVector.FixedBuilder samples = blockFactory.newLongVectorFixedBuilder(positionCount);
|
|
|
LongBlock.Builder timestamps = blockFactory.newLongBlockBuilder(positionCount * 2);
|
|
|
DoubleBlock.Builder values = blockFactory.newDoubleBlockBuilder(positionCount * 2);
|
|
|
) {
|
|
@@ -169,9 +169,7 @@ public class DeltaDoubleAggregator {
|
|
|
final var groupId = selected.getInt(i);
|
|
|
final var state = groupId < states.size() ? states.get(groupId) : null;
|
|
|
if (state != null) {
|
|
|
- samples.beginPositionEntry();
|
|
|
samples.appendLong(state.valuesSeen);
|
|
|
- samples.endPositionEntry();
|
|
|
timestamps.beginPositionEntry();
|
|
|
timestamps.appendLong(state.lastTimestamp);
|
|
|
if (state.valuesSeen > 1) {
|
|
@@ -191,7 +189,7 @@ public class DeltaDoubleAggregator {
|
|
|
values.appendNull();
|
|
|
}
|
|
|
}
|
|
|
- blocks[offset] = samples.build();
|
|
|
+ blocks[offset] = samples.build().asBlock();
|
|
|
blocks[offset + 1] = timestamps.build();
|
|
|
blocks[offset + 2] = values.build();
|
|
|
}
|
|
@@ -216,7 +214,7 @@ public class DeltaDoubleAggregator {
|
|
|
continue;
|
|
|
}
|
|
|
double startGap = state.firstTimestamp - rangeStart;
|
|
|
- final double averageSampleInterval = (state.lastTimestamp - state.firstTimestamp) / state.valuesSeen;
|
|
|
+ final double averageSampleInterval = ((double) state.lastTimestamp - state.firstTimestamp) / state.valuesSeen;
|
|
|
final double slope = (state.lastValue - state.firstValue) / (state.lastTimestamp - state.firstTimestamp);
|
|
|
double endGap = rangeEnd - state.lastTimestamp;
|
|
|
double calculatedFirstValue = state.firstValue;
|