|
@@ -0,0 +1,179 @@
|
|
|
+/*
|
|
|
+ * Copyright Elasticsearch B.V. and/or licensed to Elasticsearch B.V. under one
|
|
|
+ * or more contributor license agreements. Licensed under the Elastic License;
|
|
|
+ * you may not use this file except in compliance with the Elastic License.
|
|
|
+ */
|
|
|
+package org.elasticsearch.xpack.analytics.aggregations.metrics;
|
|
|
+
|
|
|
+import com.tdunning.math.stats.Centroid;
|
|
|
+import com.tdunning.math.stats.TDigest;
|
|
|
+import org.apache.lucene.document.BinaryDocValuesField;
|
|
|
+import org.apache.lucene.document.Field;
|
|
|
+import org.apache.lucene.document.StringField;
|
|
|
+import org.apache.lucene.index.DirectoryReader;
|
|
|
+import org.apache.lucene.index.IndexReader;
|
|
|
+import org.apache.lucene.index.RandomIndexWriter;
|
|
|
+import org.apache.lucene.index.Term;
|
|
|
+import org.apache.lucene.search.IndexSearcher;
|
|
|
+import org.apache.lucene.search.MatchAllDocsQuery;
|
|
|
+import org.apache.lucene.search.Query;
|
|
|
+import org.apache.lucene.search.TermQuery;
|
|
|
+import org.apache.lucene.store.Directory;
|
|
|
+import org.elasticsearch.common.CheckedConsumer;
|
|
|
+import org.elasticsearch.common.io.stream.BytesStreamOutput;
|
|
|
+import org.elasticsearch.index.mapper.MappedFieldType;
|
|
|
+import org.elasticsearch.plugins.SearchPlugin;
|
|
|
+import org.elasticsearch.search.aggregations.AggregationBuilder;
|
|
|
+import org.elasticsearch.search.aggregations.AggregatorTestCase;
|
|
|
+import org.elasticsearch.search.aggregations.metrics.InternalSum;
|
|
|
+import org.elasticsearch.search.aggregations.metrics.SumAggregationBuilder;
|
|
|
+import org.elasticsearch.search.aggregations.metrics.TDigestState;
|
|
|
+import org.elasticsearch.search.aggregations.support.AggregationInspectionHelper;
|
|
|
+import org.elasticsearch.search.aggregations.support.CoreValuesSourceType;
|
|
|
+import org.elasticsearch.search.aggregations.support.ValuesSourceType;
|
|
|
+import org.elasticsearch.xpack.analytics.AnalyticsPlugin;
|
|
|
+import org.elasticsearch.xpack.analytics.aggregations.support.AnalyticsValuesSourceType;
|
|
|
+import org.elasticsearch.xpack.analytics.mapper.HistogramFieldMapper;
|
|
|
+
|
|
|
+import java.io.IOException;
|
|
|
+import java.util.Arrays;
|
|
|
+import java.util.Collection;
|
|
|
+import java.util.Iterator;
|
|
|
+import java.util.List;
|
|
|
+import java.util.function.Consumer;
|
|
|
+
|
|
|
+import static java.util.Collections.singleton;
|
|
|
+import static org.elasticsearch.search.aggregations.AggregationBuilders.sum;
|
|
|
+
|
|
|
+public class HistoBackedSumAggregatorTests extends AggregatorTestCase {
|
|
|
+
|
|
|
+ private static final String FIELD_NAME = "field";
|
|
|
+
|
|
|
+ public void testNoDocs() throws IOException {
|
|
|
+ testCase(new MatchAllDocsQuery(), iw -> {
|
|
|
+ // Intentionally not writing any docs
|
|
|
+ }, sum -> {
|
|
|
+ assertEquals(0L, sum.getValue(), 0d);
|
|
|
+ assertFalse(AggregationInspectionHelper.hasValue(sum));
|
|
|
+ });
|
|
|
+ }
|
|
|
+
|
|
|
+ public void testNoMatchingField() throws IOException {
|
|
|
+ testCase(new MatchAllDocsQuery(), iw -> {
|
|
|
+ iw.addDocument(singleton(getDocValue("wrong_field", new double[] {3, 1.2, 10})));
|
|
|
+ iw.addDocument(singleton(getDocValue("wrong_field", new double[] {5.3, 6, 20})));
|
|
|
+ }, sum -> {
|
|
|
+ assertEquals(0L, sum.getValue(), 0d);
|
|
|
+ assertFalse(AggregationInspectionHelper.hasValue(sum));
|
|
|
+ });
|
|
|
+ }
|
|
|
+
|
|
|
+ public void testSimpleHistogram() throws IOException {
|
|
|
+ testCase(new MatchAllDocsQuery(), iw -> {
|
|
|
+ iw.addDocument(singleton(getDocValue(FIELD_NAME, new double[] {3, 1.2, 10})));
|
|
|
+ iw.addDocument(singleton(getDocValue(FIELD_NAME, new double[] {5.3, 6, 6, 20})));
|
|
|
+ iw.addDocument(singleton(getDocValue(FIELD_NAME, new double[] {-10, 0.01, 1, 90})));
|
|
|
+ }, sum -> {
|
|
|
+ assertEquals(132.51d, sum.getValue(), 0.01d);
|
|
|
+ assertTrue(AggregationInspectionHelper.hasValue(sum));
|
|
|
+ });
|
|
|
+ }
|
|
|
+
|
|
|
+ public void testQueryFiltering() throws IOException {
|
|
|
+ testCase(new TermQuery(new Term("match", "yes")), iw -> {
|
|
|
+ iw.addDocument(Arrays.asList(
|
|
|
+ new StringField("match", "yes", Field.Store.NO),
|
|
|
+ getDocValue(FIELD_NAME, new double[] {3, 1.2, 10}))
|
|
|
+ );
|
|
|
+ iw.addDocument(Arrays.asList(
|
|
|
+ new StringField("match", "yes", Field.Store.NO),
|
|
|
+ getDocValue(FIELD_NAME, new double[] {5.3, 6, 20}))
|
|
|
+ );
|
|
|
+ iw.addDocument(Arrays.asList(
|
|
|
+ new StringField("match", "no", Field.Store.NO),
|
|
|
+ getDocValue(FIELD_NAME, new double[] {3, 1.2, 10}))
|
|
|
+ );
|
|
|
+ iw.addDocument(Arrays.asList(
|
|
|
+ new StringField("match", "no", Field.Store.NO),
|
|
|
+ getDocValue(FIELD_NAME, new double[] {3, 1.2, 10}))
|
|
|
+ );
|
|
|
+ iw.addDocument(Arrays.asList(
|
|
|
+ new StringField("match", "yes", Field.Store.NO),
|
|
|
+ getDocValue(FIELD_NAME, new double[] {-10, 0.01, 1, 90}))
|
|
|
+ );
|
|
|
+ }, sum -> {
|
|
|
+ assertEquals(126.51d, sum.getValue(), 0.01d);
|
|
|
+ assertTrue(AggregationInspectionHelper.hasValue(sum));
|
|
|
+ });
|
|
|
+ }
|
|
|
+
|
|
|
+ private void testCase(Query query,
|
|
|
+ CheckedConsumer<RandomIndexWriter, IOException> indexer,
|
|
|
+ Consumer<InternalSum> verify) throws IOException {
|
|
|
+ testCase(query, sum("_name").field(FIELD_NAME), indexer, verify, singleton(defaultFieldType(FIELD_NAME)));
|
|
|
+ }
|
|
|
+
|
|
|
+ private void testCase(Query query,
|
|
|
+ SumAggregationBuilder aggregationBuilder,
|
|
|
+ CheckedConsumer<RandomIndexWriter, IOException> indexer,
|
|
|
+ Consumer<InternalSum> verify,
|
|
|
+ Collection<MappedFieldType> fieldTypes) throws IOException {
|
|
|
+ try (Directory directory = newDirectory()) {
|
|
|
+ try (RandomIndexWriter indexWriter = new RandomIndexWriter(random(), directory)) {
|
|
|
+ indexer.accept(indexWriter);
|
|
|
+ }
|
|
|
+
|
|
|
+ try (IndexReader indexReader = DirectoryReader.open(directory)) {
|
|
|
+ IndexSearcher indexSearcher = newSearcher(indexReader, true, true);
|
|
|
+
|
|
|
+ final MappedFieldType[] fieldTypesArray = fieldTypes.toArray(new MappedFieldType[0]);
|
|
|
+ final InternalSum internalSum = search(indexSearcher, query, aggregationBuilder, fieldTypesArray);
|
|
|
+ verify.accept(internalSum);
|
|
|
+ }
|
|
|
+ }
|
|
|
+ }
|
|
|
+
|
|
|
+ private BinaryDocValuesField getDocValue(String fieldName, double[] values) throws IOException {
|
|
|
+ TDigest histogram = new TDigestState(100.0); //default
|
|
|
+ for (double value : values) {
|
|
|
+ histogram.add(value);
|
|
|
+ }
|
|
|
+ BytesStreamOutput streamOutput = new BytesStreamOutput();
|
|
|
+ histogram.compress();
|
|
|
+ Collection<Centroid> centroids = histogram.centroids();
|
|
|
+ Iterator<Centroid> iterator = centroids.iterator();
|
|
|
+ while ( iterator.hasNext()) {
|
|
|
+ Centroid centroid = iterator.next();
|
|
|
+ streamOutput.writeVInt(centroid.count());
|
|
|
+ streamOutput.writeDouble(centroid.mean());
|
|
|
+ }
|
|
|
+ return new BinaryDocValuesField(fieldName, streamOutput.bytes().toBytesRef());
|
|
|
+ }
|
|
|
+
|
|
|
+ @Override
|
|
|
+ protected List<SearchPlugin> getSearchPlugins() {
|
|
|
+ return List.of(new AnalyticsPlugin());
|
|
|
+ }
|
|
|
+
|
|
|
+ @Override
|
|
|
+ protected List<ValuesSourceType> getSupportedValuesSourceTypes() {
|
|
|
+ // Note: this is the same list as Core, plus Analytics
|
|
|
+ return List.of(
|
|
|
+ CoreValuesSourceType.NUMERIC,
|
|
|
+ CoreValuesSourceType.DATE,
|
|
|
+ CoreValuesSourceType.BOOLEAN,
|
|
|
+ AnalyticsValuesSourceType.HISTOGRAM
|
|
|
+ );
|
|
|
+ }
|
|
|
+
|
|
|
+ @Override
|
|
|
+ protected AggregationBuilder createAggBuilderForTypeTest(MappedFieldType fieldType, String fieldName) {
|
|
|
+ return new SumAggregationBuilder("_name").field(fieldName);
|
|
|
+ }
|
|
|
+
|
|
|
+ private MappedFieldType defaultFieldType(String fieldName) {
|
|
|
+ MappedFieldType fieldType = new HistogramFieldMapper.Builder("field").fieldType();
|
|
|
+ fieldType.setName("field");
|
|
|
+ return fieldType;
|
|
|
+ }
|
|
|
+}
|