|
@@ -22,6 +22,7 @@ import org.elasticsearch.datastreams.DataStreamsPlugin;
|
|
|
import org.elasticsearch.index.IndexMode;
|
|
|
import org.elasticsearch.index.IndexSettings;
|
|
|
import org.elasticsearch.plugins.Plugin;
|
|
|
+import org.elasticsearch.test.ESIntegTestCase;
|
|
|
import org.elasticsearch.test.ESTestCase;
|
|
|
import org.elasticsearch.xcontent.XContentBuilder;
|
|
|
import org.elasticsearch.xcontent.XContentFactory;
|
|
@@ -43,15 +44,22 @@ import java.util.Objects;
|
|
|
import java.util.stream.Collectors;
|
|
|
|
|
|
import static org.elasticsearch.test.hamcrest.ElasticsearchAssertions.assertAcked;
|
|
|
+import static org.hamcrest.Matchers.allOf;
|
|
|
+import static org.hamcrest.Matchers.closeTo;
|
|
|
import static org.hamcrest.Matchers.containsInAnyOrder;
|
|
|
import static org.hamcrest.Matchers.equalTo;
|
|
|
+import static org.hamcrest.Matchers.lessThan;
|
|
|
+import static org.hamcrest.Matchers.lessThanOrEqualTo;
|
|
|
+import static org.hamcrest.Matchers.not;
|
|
|
|
|
|
@SuppressWarnings("unchecked")
|
|
|
+@ESIntegTestCase.ClusterScope(maxNumDataNodes = 1)
|
|
|
public class RandomizedTimeSeriesIT extends AbstractEsqlIntegTestCase {
|
|
|
-
|
|
|
private static final Long NUM_DOCS = 2000L;
|
|
|
+ private static final Long TIME_RANGE_SECONDS = 3600L;
|
|
|
private static final String DATASTREAM_NAME = "tsit_ds";
|
|
|
- private List<XContentBuilder> documents = null;
|
|
|
+ private static final Integer SECONDS_IN_WINDOW = 60;
|
|
|
+ private List<XContentBuilder> documents;
|
|
|
private TSDataGenerationHelper dataGenerationHelper;
|
|
|
|
|
|
List<List<Object>> consumeRows(EsqlQueryResponse resp) {
|
|
@@ -103,7 +111,8 @@ public class RandomizedTimeSeriesIT extends AbstractEsqlIntegTestCase {
|
|
|
MAX,
|
|
|
MIN,
|
|
|
AVG,
|
|
|
- SUM
|
|
|
+ SUM,
|
|
|
+ COUNT
|
|
|
}
|
|
|
|
|
|
static List<Integer> valuesInWindow(List<Map<String, Object>> pointsInGroup, String metricName) {
|
|
@@ -115,15 +124,61 @@ public class RandomizedTimeSeriesIT extends AbstractEsqlIntegTestCase {
|
|
|
return values;
|
|
|
}
|
|
|
|
|
|
- static Double aggregateValuesInWindow(List<Integer> values, Agg agg) {
|
|
|
- if (values.isEmpty()) {
|
|
|
- throw new IllegalArgumentException("No values to aggregate for " + agg + " operation");
|
|
|
+ static Map<String, List<Tuple<String, Tuple<Instant, Integer>>>> groupByTimeseries(
|
|
|
+ List<Map<String, Object>> pointsInGroup,
|
|
|
+ String metricName
|
|
|
+ ) {
|
|
|
+ return pointsInGroup.stream()
|
|
|
+ .filter(doc -> doc.containsKey("metrics") && ((Map<String, Object>) doc.get("metrics")).containsKey(metricName))
|
|
|
+ .map(doc -> {
|
|
|
+ String docKey = ((Map<String, Object>) doc.get("attributes")).entrySet()
|
|
|
+ .stream()
|
|
|
+ .map(entry -> entry.getKey() + ":" + entry.getValue())
|
|
|
+ .collect(Collectors.joining(","));
|
|
|
+ var docTs = Instant.parse((String) doc.get("@timestamp"));
|
|
|
+ var docValue = (Integer) ((Map<String, Object>) doc.get("metrics")).get(metricName);
|
|
|
+ return new Tuple<>(docKey, new Tuple<>(docTs, docValue));
|
|
|
+ })
|
|
|
+ .collect(Collectors.groupingBy(Tuple::v1));
|
|
|
+ }
|
|
|
+
|
|
|
+ static Object aggregatePerTimeseries(
|
|
|
+ Map<String, List<Tuple<String, Tuple<Instant, Integer>>>> timeseries,
|
|
|
+ Agg crossAgg,
|
|
|
+ Agg timeseriesAgg
|
|
|
+ ) {
|
|
|
+ var res = timeseries.values().stream().map(timeseriesList -> {
|
|
|
+ List<Integer> values = timeseriesList.stream().map(t -> t.v2().v2()).collect(Collectors.toList());
|
|
|
+ return aggregateValuesInWindow(values, timeseriesAgg);
|
|
|
+ }).filter(Objects::nonNull).toList();
|
|
|
+
|
|
|
+ if (res.isEmpty() && timeseriesAgg == Agg.COUNT) {
|
|
|
+ res = List.of(0.0);
|
|
|
}
|
|
|
+
|
|
|
+ return switch (crossAgg) {
|
|
|
+ case MAX -> res.isEmpty()
|
|
|
+ ? null
|
|
|
+ : Double.valueOf(res.stream().mapToDouble(Double::doubleValue).max().orElseThrow()).longValue();
|
|
|
+ case MIN -> res.isEmpty()
|
|
|
+ ? null
|
|
|
+ : Double.valueOf(res.stream().mapToDouble(Double::doubleValue).min().orElseThrow()).longValue();
|
|
|
+ case AVG -> res.isEmpty() ? null : res.stream().mapToDouble(Double::doubleValue).average().orElseThrow();
|
|
|
+ case SUM -> res.isEmpty() ? null : Double.valueOf(res.stream().mapToDouble(Double::doubleValue).sum()).longValue();
|
|
|
+ case COUNT -> Integer.toUnsignedLong(res.size());
|
|
|
+ };
|
|
|
+ }
|
|
|
+
|
|
|
+ static Double aggregateValuesInWindow(List<Integer> values, Agg agg) {
|
|
|
+ // if (values.isEmpty()) {
|
|
|
+ // throw new IllegalArgumentException("No values to aggregate for " + agg + " operation");
|
|
|
+ // }
|
|
|
return switch (agg) {
|
|
|
case MAX -> Double.valueOf(values.stream().max(Integer::compareTo).orElseThrow());
|
|
|
case MIN -> Double.valueOf(values.stream().min(Integer::compareTo).orElseThrow());
|
|
|
case AVG -> values.stream().mapToDouble(Integer::doubleValue).average().orElseThrow();
|
|
|
- case SUM -> values.stream().mapToDouble(Integer::doubleValue).sum();
|
|
|
+ case SUM -> values.isEmpty() ? null : values.stream().mapToDouble(Integer::doubleValue).sum();
|
|
|
+ case COUNT -> (double) values.size();
|
|
|
};
|
|
|
}
|
|
|
|
|
@@ -150,10 +205,87 @@ public class RandomizedTimeSeriesIT extends AbstractEsqlIntegTestCase {
|
|
|
return List.of(DataStreamsPlugin.class, LocalStateCompositeXPackPlugin.class, AggregateMetricMapperPlugin.class, EsqlPlugin.class);
|
|
|
}
|
|
|
|
|
|
+ record RateRange(Double lower, Double upper) implements Comparable<RateRange> {
|
|
|
+ @Override
|
|
|
+ public int compareTo(RateRange o) {
|
|
|
+ // Compare first by lower bound, then by upper bound
|
|
|
+ int cmp = this.lower.compareTo(o.lower);
|
|
|
+ if (cmp == 0) {
|
|
|
+ return this.upper.compareTo(o.upper);
|
|
|
+ }
|
|
|
+ return cmp;
|
|
|
+ }
|
|
|
+
|
|
|
+ public int compareToFindingMax(RateRange o) {
|
|
|
+ // Compare first by upper bound, then by lower bound
|
|
|
+ int cmp = this.upper.compareTo(o.upper);
|
|
|
+ if (cmp == 0) {
|
|
|
+ return this.lower.compareTo(o.lower);
|
|
|
+ }
|
|
|
+ return cmp;
|
|
|
+ }
|
|
|
+ }
|
|
|
+
|
|
|
+ // A record that holds min, max, avg, count and sum of rates calculated from a timeseries.
|
|
|
+ record RateStats(Long count, RateRange max, RateRange avg, RateRange min, RateRange sum) {}
|
|
|
+
|
|
|
+ static RateStats calculateRateAggregation(
|
|
|
+ Collection<List<Tuple<String, Tuple<Instant, Integer>>>> allTimeseries,
|
|
|
+ Integer secondsInWindow
|
|
|
+ ) {
|
|
|
+ List<RateRange> allRates = allTimeseries.stream().map(timeseries -> {
|
|
|
+ if (timeseries.size() < 2) {
|
|
|
+ return null;
|
|
|
+ }
|
|
|
+ // Sort the timeseries by timestamp
|
|
|
+ timeseries.sort((t1, t2) -> t1.v2().v1().compareTo(t2.v2().v1()));
|
|
|
+ var firstTs = timeseries.getFirst().v2().v1();
|
|
|
+ var lastTs = timeseries.getLast().v2().v1();
|
|
|
+ Integer lastValue = null;
|
|
|
+ Double counterGrowth = 0.0;
|
|
|
+ for (Tuple<String, Tuple<Instant, Integer>> point : timeseries) {
|
|
|
+ var currentValue = point.v2().v2();
|
|
|
+ if (currentValue == null) {
|
|
|
+ throw new IllegalArgumentException("Null value in counter timeseries");
|
|
|
+ }
|
|
|
+ if (lastValue == null) {
|
|
|
+ lastValue = point.v2().v2(); // Initialize with the first value
|
|
|
+ continue;
|
|
|
+ }
|
|
|
+ if (currentValue > lastValue) {
|
|
|
+ counterGrowth += currentValue - lastValue; // Incremental growth
|
|
|
+ } else if (currentValue < lastValue) {
|
|
|
+ // If the value decreased, we assume a reset and start counting from the current value
|
|
|
+ counterGrowth += currentValue;
|
|
|
+ }
|
|
|
+ lastValue = currentValue; // Update last value for next iteration
|
|
|
+ }
|
|
|
+ return new RateRange(
|
|
|
+ counterGrowth / secondsInWindow * 0.99, // Add 1% tolerance to the lower bound
|
|
|
+ 1000.0 * counterGrowth / (lastTs.toEpochMilli() - firstTs.toEpochMilli()) * 1.01 // Add 1% tolerance to the upper bound
|
|
|
+ );
|
|
|
+ }).filter(Objects::nonNull).toList();
|
|
|
+ if (allRates.isEmpty()) {
|
|
|
+ return new RateStats(0L, null, null, null, new RateRange(0.0, 0.0));
|
|
|
+ }
|
|
|
+ return new RateStats(
|
|
|
+ (long) allRates.size(),
|
|
|
+ allRates.stream().max(RateRange::compareToFindingMax).orElseThrow(),
|
|
|
+ new RateRange(
|
|
|
+ allRates.stream().mapToDouble(r -> r.lower).average().orElseThrow(),
|
|
|
+ allRates.stream().mapToDouble(r -> r.upper).average().orElseThrow()
|
|
|
+ ),
|
|
|
+ allRates.stream().min(RateRange::compareTo).orElseThrow(),
|
|
|
+ new RateRange(allRates.stream().mapToDouble(r -> r.lower).sum(), allRates.stream().mapToDouble(r -> r.upper).sum())
|
|
|
+ );
|
|
|
+ }
|
|
|
+
|
|
|
void putTSDBIndexTemplate(List<String> patterns, @Nullable String mappingString) throws IOException {
|
|
|
Settings.Builder settingsBuilder = Settings.builder();
|
|
|
// Ensure it will be a TSDB data stream
|
|
|
settingsBuilder.put(IndexSettings.MODE.getKey(), IndexMode.TIME_SERIES);
|
|
|
+ settingsBuilder.put(IndexSettings.TIME_SERIES_START_TIME.getKey(), "2025-07-31T00:00:00Z");
|
|
|
+ settingsBuilder.put(IndexSettings.TIME_SERIES_END_TIME.getKey(), "2025-07-31T12:00:00Z");
|
|
|
CompressedXContent mappings = mappingString == null ? null : CompressedXContent.fromJSON(mappingString);
|
|
|
TransportPutComposableIndexTemplateAction.Request request = new TransportPutComposableIndexTemplateAction.Request(
|
|
|
RandomizedTimeSeriesIT.DATASTREAM_NAME
|
|
@@ -171,7 +303,7 @@ public class RandomizedTimeSeriesIT extends AbstractEsqlIntegTestCase {
|
|
|
|
|
|
@Before
|
|
|
public void populateIndex() throws IOException {
|
|
|
- dataGenerationHelper = new TSDataGenerationHelper(NUM_DOCS);
|
|
|
+ dataGenerationHelper = new TSDataGenerationHelper(NUM_DOCS, TIME_RANGE_SECONDS);
|
|
|
final XContentBuilder builder = XContentFactory.jsonBuilder();
|
|
|
builder.map(dataGenerationHelper.mapping.raw());
|
|
|
final String jsonMappings = Strings.toString(builder);
|
|
@@ -190,6 +322,103 @@ public class RandomizedTimeSeriesIT extends AbstractEsqlIntegTestCase {
|
|
|
}
|
|
|
}
|
|
|
|
|
|
+ void checkWithin(Double actual, RateRange expected) {
|
|
|
+ if (expected == null) {
|
|
|
+ assertThat(actual, equalTo(null));
|
|
|
+ return;
|
|
|
+ }
|
|
|
+ assertThat(actual, allOf(lessThanOrEqualTo(expected.upper), not(lessThan(expected.lower))));
|
|
|
+ }
|
|
|
+
|
|
|
+ void assertNoFailedWindows(List<String> failedWindows, List<List<Object>> rows) {
|
|
|
+ if (failedWindows.isEmpty() == false) {
|
|
|
+ var pctFailures = (double) failedWindows.size() / rows.size() * 100;
|
|
|
+ var failureDetails = String.join("\n", failedWindows);
|
|
|
+ if (failureDetails.length() > 2000) {
|
|
|
+ failureDetails = failureDetails.substring(0, 2000) + "\n... (truncated)";
|
|
|
+ }
|
|
|
+ throw new AssertionError("Failed " + failedWindows.size() + " windows(" + pctFailures + "%):\n" + failureDetails);
|
|
|
+ }
|
|
|
+ }
|
|
|
+
|
|
|
+ /**
|
|
|
+ * This test validates Rate metrics aggregation with grouping by time bucket and a subset of dimensions.
|
|
|
+ * The subset of dimensions is a random subset of the dimensions present in the data.
|
|
|
+ * The test checks that the count, max, min, and avg values of the rate metric - and calculates
|
|
|
+ * the same values from the documents in the group.
|
|
|
+ */
|
|
|
+ public void testRateGroupBySubset() {
|
|
|
+ var dimensions = ESTestCase.randomNonEmptySubsetOf(dataGenerationHelper.attributesForMetrics);
|
|
|
+ var dimensionsStr = dimensions.stream().map(d -> "attributes." + d).collect(Collectors.joining(", "));
|
|
|
+ try (var resp = run(String.format(Locale.ROOT, """
|
|
|
+ TS %s
|
|
|
+ | STATS count(rate(metrics.counter_hdd.bytes.read)),
|
|
|
+ max(rate(metrics.counter_hdd.bytes.read)),
|
|
|
+ avg(rate(metrics.counter_hdd.bytes.read)),
|
|
|
+ min(rate(metrics.counter_hdd.bytes.read))
|
|
|
+ BY tbucket=bucket(@timestamp, 1 minute), %s
|
|
|
+ | SORT tbucket
|
|
|
+ | LIMIT 1000
|
|
|
+ """, DATASTREAM_NAME, dimensionsStr))) {
|
|
|
+ List<List<Object>> rows = consumeRows(resp);
|
|
|
+ List<String> failedWindows = new ArrayList<>();
|
|
|
+ var groups = groupedRows(documents, dimensions, SECONDS_IN_WINDOW);
|
|
|
+ for (List<Object> row : rows) {
|
|
|
+ var rowKey = getRowKey(row, dimensions, 4);
|
|
|
+ var windowDataPoints = groups.get(rowKey);
|
|
|
+ var docsPerTimeseries = groupByTimeseries(windowDataPoints, "counter_hdd.bytes.read");
|
|
|
+ var rateAgg = calculateRateAggregation(docsPerTimeseries.values(), SECONDS_IN_WINDOW);
|
|
|
+ try {
|
|
|
+ assertThat(row.getFirst(), equalTo(rateAgg.count));
|
|
|
+ checkWithin((Double) row.get(1), rateAgg.max);
|
|
|
+ checkWithin((Double) row.get(2), rateAgg.avg);
|
|
|
+ checkWithin((Double) row.get(3), rateAgg.min);
|
|
|
+ } catch (AssertionError e) {
|
|
|
+ failedWindows.add("Failed for row:\n" + row + "\nWanted: " + rateAgg + "\nException: " + e.getMessage());
|
|
|
+ }
|
|
|
+ }
|
|
|
+ assertNoFailedWindows(failedWindows, rows);
|
|
|
+ }
|
|
|
+ }
|
|
|
+
|
|
|
+ /**
|
|
|
+ * This test validates Rate metrics aggregation with grouping by time bucket only.
|
|
|
+ * The test checks that the count, max, min, and avg values of the rate metric - and calculates
|
|
|
+ * the same values from the documents in the group. Because there is no grouping by dimensions,
|
|
|
+ * there is only one metric group per time bucket.
|
|
|
+ */
|
|
|
+ public void testRateGroupByNothing() {
|
|
|
+ var groups = groupedRows(documents, List.of(), 60);
|
|
|
+ try (var resp = run(String.format(Locale.ROOT, """
|
|
|
+ TS %s
|
|
|
+ | STATS count(rate(metrics.counter_hdd.bytes.read)),
|
|
|
+ max(rate(metrics.counter_hdd.bytes.read)),
|
|
|
+ avg(rate(metrics.counter_hdd.bytes.read)),
|
|
|
+ min(rate(metrics.counter_hdd.bytes.read))
|
|
|
+ BY tbucket=bucket(@timestamp, 1 minute)
|
|
|
+ | SORT tbucket
|
|
|
+ | LIMIT 1000
|
|
|
+ """, DATASTREAM_NAME))) {
|
|
|
+ List<List<Object>> rows = consumeRows(resp);
|
|
|
+ List<String> failedWindows = new ArrayList<>();
|
|
|
+ for (List<Object> row : rows) {
|
|
|
+ var windowStart = windowStart(row.get(4), SECONDS_IN_WINDOW);
|
|
|
+ var windowDataPoints = groups.get(List.of(Long.toString(windowStart)));
|
|
|
+ var docsPerTimeseries = groupByTimeseries(windowDataPoints, "counter_hdd.bytes.read");
|
|
|
+ var rateAgg = calculateRateAggregation(docsPerTimeseries.values(), SECONDS_IN_WINDOW);
|
|
|
+ try {
|
|
|
+ assertThat(row.getFirst(), equalTo(rateAgg.count));
|
|
|
+ checkWithin((Double) row.get(1), rateAgg.max);
|
|
|
+ checkWithin((Double) row.get(2), rateAgg.avg);
|
|
|
+ checkWithin((Double) row.get(3), rateAgg.min);
|
|
|
+ } catch (AssertionError e) {
|
|
|
+ failedWindows.add("Failed for row:\n" + row + "\nWanted: " + rateAgg + "\nException: " + e.getMessage());
|
|
|
+ }
|
|
|
+ }
|
|
|
+ assertNoFailedWindows(failedWindows, rows);
|
|
|
+ }
|
|
|
+ }
|
|
|
+
|
|
|
/**
|
|
|
* This test validates Gauge metrics aggregation with grouping by time bucket and a subset of dimensions.
|
|
|
* The subset of dimensions is a random subset of the dimensions present in the data.
|
|
@@ -207,29 +436,32 @@ public class RandomizedTimeSeriesIT extends AbstractEsqlIntegTestCase {
|
|
|
min(min_over_time(metrics.gauge_hdd.bytes.used)),
|
|
|
sum(count_over_time(metrics.gauge_hdd.bytes.used)),
|
|
|
sum(sum_over_time(metrics.gauge_hdd.bytes.used)),
|
|
|
- avg(avg_over_time(metrics.gauge_hdd.bytes.used))
|
|
|
+ avg(avg_over_time(metrics.gauge_hdd.bytes.used)),
|
|
|
+ count(count_over_time(metrics.gauge_hdd.bytes.used))
|
|
|
BY tbucket=bucket(@timestamp, 1 minute), %s
|
|
|
| SORT tbucket
|
|
|
| LIMIT 1000""", DATASTREAM_NAME, dimensionsStr))) {
|
|
|
var groups = groupedRows(documents, dimensions, 60);
|
|
|
List<List<Object>> rows = consumeRows(resp);
|
|
|
for (List<Object> row : rows) {
|
|
|
- var rowKey = getRowKey(row, dimensions, 6);
|
|
|
+ var rowKey = getRowKey(row, dimensions, 7);
|
|
|
+ var tsGroups = groupByTimeseries(groups.get(rowKey), "gauge_hdd.bytes.used");
|
|
|
var docValues = valuesInWindow(groups.get(rowKey), "gauge_hdd.bytes.used");
|
|
|
if (row.get(0) instanceof List) {
|
|
|
assertThat(
|
|
|
- (Collection<Long>) row.get(0),
|
|
|
+ (Collection<Long>) row.getFirst(),
|
|
|
containsInAnyOrder(docValues.stream().mapToLong(Integer::longValue).boxed().toArray(Long[]::new))
|
|
|
);
|
|
|
} else {
|
|
|
- assertThat(row.get(0), equalTo(docValues.getFirst().longValue()));
|
|
|
+ assertThat(row.getFirst(), equalTo(docValues.isEmpty() ? null : docValues.getFirst().longValue()));
|
|
|
}
|
|
|
- assertThat(row.get(1), equalTo(Math.round(aggregateValuesInWindow(docValues, Agg.MAX))));
|
|
|
- assertThat(row.get(2), equalTo(Math.round(aggregateValuesInWindow(docValues, Agg.MIN))));
|
|
|
- assertThat(row.get(3), equalTo((long) docValues.size()));
|
|
|
- assertThat(row.get(4), equalTo(aggregateValuesInWindow(docValues, Agg.SUM).longValue()));
|
|
|
- // TODO: fix then enable
|
|
|
- // assertThat(row.get(5), equalTo(aggregateValuesInWindow(docValues, Agg.SUM) / (double) docValues.size()));
|
|
|
+ assertThat(row.get(1), equalTo(aggregatePerTimeseries(tsGroups, Agg.MAX, Agg.MAX)));
|
|
|
+ assertThat(row.get(2), equalTo(aggregatePerTimeseries(tsGroups, Agg.MIN, Agg.MIN)));
|
|
|
+ assertThat(row.get(3), equalTo(aggregatePerTimeseries(tsGroups, Agg.SUM, Agg.COUNT)));
|
|
|
+ assertThat(row.get(4), equalTo(aggregatePerTimeseries(tsGroups, Agg.SUM, Agg.SUM)));
|
|
|
+ var avg = (Double) aggregatePerTimeseries(tsGroups, Agg.AVG, Agg.AVG);
|
|
|
+ assertThat((Double) row.get(5), row.get(5) == null ? equalTo(null) : closeTo(avg, avg * 0.01));
|
|
|
+ // assertThat(row.get(6), equalTo(aggregatePerTimeseries(tsGroups, Agg.COUNT, Agg.COUNT).longValue()));
|
|
|
}
|
|
|
}
|
|
|
}
|
|
@@ -249,29 +481,32 @@ public class RandomizedTimeSeriesIT extends AbstractEsqlIntegTestCase {
|
|
|
min(min_over_time(metrics.gauge_hdd.bytes.used)),
|
|
|
sum(count_over_time(metrics.gauge_hdd.bytes.used)),
|
|
|
sum(sum_over_time(metrics.gauge_hdd.bytes.used)),
|
|
|
- avg(avg_over_time(metrics.gauge_hdd.bytes.used))
|
|
|
+ avg(avg_over_time(metrics.gauge_hdd.bytes.used)),
|
|
|
+ count(count_over_time(metrics.gauge_hdd.bytes.used))
|
|
|
BY tbucket=bucket(@timestamp, 1 minute)
|
|
|
| SORT tbucket
|
|
|
| LIMIT 1000""", DATASTREAM_NAME))) {
|
|
|
List<List<Object>> rows = consumeRows(resp);
|
|
|
var groups = groupedRows(documents, List.of(), 60);
|
|
|
for (List<Object> row : rows) {
|
|
|
- var windowStart = windowStart(row.get(6), 60);
|
|
|
+ var windowStart = windowStart(row.get(7), 60);
|
|
|
List<Integer> docValues = valuesInWindow(groups.get(List.of(Long.toString(windowStart))), "gauge_hdd.bytes.used");
|
|
|
+ var tsGroups = groupByTimeseries(groups.get(List.of(Long.toString(windowStart))), "gauge_hdd.bytes.used");
|
|
|
if (row.get(0) instanceof List) {
|
|
|
assertThat(
|
|
|
(Collection<Long>) row.get(0),
|
|
|
containsInAnyOrder(docValues.stream().mapToLong(Integer::longValue).boxed().toArray(Long[]::new))
|
|
|
);
|
|
|
} else {
|
|
|
- assertThat(row.get(0), equalTo(docValues.getFirst().longValue()));
|
|
|
+ assertThat(row.getFirst(), equalTo(docValues.isEmpty() ? null : docValues.getFirst().longValue()));
|
|
|
}
|
|
|
- assertThat(row.get(1), equalTo(Math.round(aggregateValuesInWindow(docValues, Agg.MAX))));
|
|
|
- assertThat(row.get(2), equalTo(Math.round(aggregateValuesInWindow(docValues, Agg.MIN))));
|
|
|
- assertThat(row.get(3), equalTo((long) docValues.size()));
|
|
|
- assertThat(row.get(4), equalTo(aggregateValuesInWindow(docValues, Agg.SUM).longValue()));
|
|
|
- // TODO: fix then enable
|
|
|
- // assertThat(row.get(5), equalTo(aggregateValuesInWindow(docValues, Agg.SUM) / (double) docValues.size()));
|
|
|
+ assertThat(row.get(1), equalTo(aggregatePerTimeseries(tsGroups, Agg.MAX, Agg.MAX)));
|
|
|
+ assertThat(row.get(2), equalTo(aggregatePerTimeseries(tsGroups, Agg.MIN, Agg.MIN)));
|
|
|
+ assertThat(row.get(3), equalTo(aggregatePerTimeseries(tsGroups, Agg.SUM, Agg.COUNT)));
|
|
|
+ assertThat(row.get(4), equalTo(aggregatePerTimeseries(tsGroups, Agg.SUM, Agg.SUM)));
|
|
|
+ var avg = (Double) aggregatePerTimeseries(tsGroups, Agg.AVG, Agg.AVG);
|
|
|
+ assertThat((Double) row.get(5), row.get(5) == null ? equalTo(null) : closeTo(avg, avg * 0.01));
|
|
|
+ // assertThat(row.get(6), equalTo(aggregatePerTimeseries(tsGroups, Agg.COUNT, Agg.COUNT).longValue()));
|
|
|
}
|
|
|
}
|
|
|
}
|