Browse Source

ML: Add support for single bucket aggs in Datafeeds (#37544)

Single bucket aggs are now supported in datafeed aggregation configurations.
Benjamin Trent 6 years ago
parent
commit
12cdf1cba4

+ 47 - 0
docs/reference/ml/aggregations.asciidoc

@@ -145,6 +145,53 @@ pipeline aggregation to find the first order derivative of the counter
 ----------------------------------
 // NOTCONSOLE
 
+{dfeeds-cap} not only supports multi-bucket aggregations, but also single bucket aggregations.
+The following shows two `filter` aggregations, each gathering the number of unique entries for
+the `error` field.
+
+[source,js]
+----------------------------------
+{
+  "job_id":"servers-unique-errors",
+  "indices": ["logs-*"],
+  "aggregations": {
+    "buckets": {
+      "date_histogram": {
+        "field": "time",
+        "interval": "360s",
+        "time_zone": "UTC"
+      },
+      "aggregations": {
+        "time": {
+          "max": {"field": "time"}
+        }
+        "server1": {
+          "filter": {"term": {"source": "server-name-1"}},
+          "aggregations": {
+            "server1_error_count": {
+              "value_count": {
+                "field": "error"
+              }
+            }
+          }
+        },
+        "server2": {
+          "filter": {"term": {"source": "server-name-2"}},
+          "aggregations": {
+            "server2_error_count": {
+              "value_count": {
+                "field": "error"
+              }
+            }
+          }
+        }
+      }
+    }
+  }
+}
+----------------------------------
+// NOTCONSOLE
+
 When you define an aggregation in a {dfeed}, it must have the following form:
 
 [source,js]

+ 38 - 0
x-pack/plugin/ml/qa/native-multi-node-tests/src/test/java/org/elasticsearch/xpack/ml/integration/DatafeedJobsRestIT.java

@@ -894,6 +894,44 @@ public class DatafeedJobsRestIT extends ESRestTestCase {
             "action [indices:admin/xpack/rollup/search] is unauthorized for user [ml_admin_plus_data]\""));
     }
 
+    public void testLookbackWithSingleBucketAgg() throws Exception {
+        String jobId = "aggs-date-histogram-with-single-bucket-agg-job";
+        Request createJobRequest = new Request("PUT", MachineLearning.BASE_PATH + "anomaly_detectors/" + jobId);
+        createJobRequest.setJsonEntity("{\n"
+            + "  \"description\": \"Aggs job\",\n"
+            + "  \"analysis_config\": {\n"
+            + "    \"bucket_span\": \"3600s\",\n"
+            + "    \"summary_count_field_name\": \"doc_count\",\n"
+            + "    \"detectors\": [\n"
+            + "      {\n"
+            + "        \"function\": \"mean\",\n"
+            + "        \"field_name\": \"responsetime\""
+            + "      }\n"
+            + "    ]\n"
+            + "  },\n"
+            + "  \"data_description\": {\"time_field\": \"time stamp\"}\n"
+            + "}");
+        client().performRequest(createJobRequest);
+
+        String datafeedId = "datafeed-" + jobId;
+        String aggregations = "{\"time stamp\":{\"date_histogram\":{\"field\":\"time stamp\",\"interval\":\"1h\"},"
+            + "\"aggregations\":{"
+            + "\"time stamp\":{\"max\":{\"field\":\"time stamp\"}},"
+            + "\"airlineFilter\":{\"filter\":{\"term\": {\"airline\":\"AAA\"}},"
+            + "  \"aggregations\":{\"responsetime\":{\"avg\":{\"field\":\"responsetime\"}}}}}}}";
+        new DatafeedBuilder(datafeedId, jobId, "airline-data-aggs", "response").setAggregations(aggregations).build();
+        openJob(client(), jobId);
+
+        startDatafeedAndWaitUntilStopped(datafeedId);
+        waitUntilJobIsClosed(jobId);
+        Response jobStatsResponse = client().performRequest(new Request("GET",
+            MachineLearning.BASE_PATH + "anomaly_detectors/" + jobId + "/_stats"));
+        String jobStatsResponseAsString = EntityUtils.toString(jobStatsResponse.getEntity());
+        assertThat(jobStatsResponseAsString, containsString("\"input_record_count\":2"));
+        assertThat(jobStatsResponseAsString, containsString("\"processed_record_count\":2"));
+        assertThat(jobStatsResponseAsString, containsString("\"missing_field_count\":0"));
+    }
+
     public void testRealtime() throws Exception {
         String jobId = "job-realtime-1";
         createJob(jobId, "airline");

+ 36 - 1
x-pack/plugin/ml/src/main/java/org/elasticsearch/xpack/ml/datafeed/extractor/aggregation/AggregationToJsonProcessor.java

@@ -13,6 +13,7 @@ import org.elasticsearch.common.xcontent.json.JsonXContent;
 import org.elasticsearch.search.aggregations.Aggregation;
 import org.elasticsearch.search.aggregations.Aggregations;
 import org.elasticsearch.search.aggregations.bucket.MultiBucketsAggregation;
+import org.elasticsearch.search.aggregations.bucket.SingleBucketAggregation;
 import org.elasticsearch.search.aggregations.bucket.histogram.Histogram;
 import org.elasticsearch.search.aggregations.metrics.Max;
 import org.elasticsearch.search.aggregations.metrics.NumericMetricsAggregation;
@@ -34,6 +35,7 @@ import java.util.Objects;
 import java.util.Set;
 import java.util.SortedMap;
 import java.util.TreeMap;
+import java.util.stream.Collectors;
 
 /**
  * Processes {@link Aggregation} objects and writes flat JSON documents for each leaf aggregation.
@@ -93,18 +95,39 @@ class AggregationToJsonProcessor {
 
         List<Aggregation> leafAggregations = new ArrayList<>();
         List<MultiBucketsAggregation> bucketAggregations = new ArrayList<>();
+        List<SingleBucketAggregation> singleBucketAggregations = new ArrayList<>();
 
         // Sort into leaf and bucket aggregations.
         // The leaf aggregations will be processed first.
         for (Aggregation agg : aggregations) {
             if (agg instanceof MultiBucketsAggregation) {
                 bucketAggregations.add((MultiBucketsAggregation)agg);
+            } else if (agg instanceof SingleBucketAggregation){
+                // Skip a level down for single bucket aggs, if they have a sub-agg that is not
+                // a bucketed agg we should treat it like a leaf in this bucket
+                SingleBucketAggregation singleBucketAggregation = (SingleBucketAggregation)agg;
+                for (Aggregation subAgg : singleBucketAggregation.getAggregations()) {
+                    if (subAgg instanceof MultiBucketsAggregation || subAgg instanceof SingleBucketAggregation) {
+                        singleBucketAggregations.add(singleBucketAggregation);
+                    } else {
+                        leafAggregations.add(subAgg);
+                    }
+                }
             } else {
                 leafAggregations.add(agg);
             }
         }
 
-        if (bucketAggregations.size() > 1) {
+        // If on the current level (indicated via bucketAggregations) or one of the next levels (singleBucketAggregations)
+        // we have more than 1 `MultiBucketsAggregation`, we should error out.
+        // We need to make the check in this way as each of the items in `singleBucketAggregations` is treated as a separate branch
+        // in the recursive handling of this method.
+        int bucketAggLevelCount = Math.max(bucketAggregations.size(), (int)singleBucketAggregations.stream()
+            .flatMap(s -> asList(s.getAggregations()).stream())
+            .filter(MultiBucketsAggregation.class::isInstance)
+            .count());
+
+        if (bucketAggLevelCount > 1) {
             throw new IllegalArgumentException("Multiple bucket aggregations at the same level are not supported");
         }
 
@@ -137,6 +160,18 @@ class AggregationToJsonProcessor {
                 }
             }
         }
+        noMoreBucketsToProcess = singleBucketAggregations.isEmpty() && noMoreBucketsToProcess;
+        // we support more than one `SingleBucketAggregation` at each level
+        // However, we only want to recurse with multi/single bucket aggs.
+        // Non-bucketed sub-aggregations were handle as leaf aggregations at this level
+        for (SingleBucketAggregation singleBucketAggregation : singleBucketAggregations) {
+            processAggs(singleBucketAggregation.getDocCount(),
+                asList(singleBucketAggregation.getAggregations())
+                    .stream()
+                    .filter(
+                        aggregation -> (aggregation instanceof MultiBucketsAggregation || aggregation instanceof SingleBucketAggregation))
+                    .collect(Collectors.toList()));
+        }
 
         // If there are no more bucket aggregations to process we've reached the end
         // and it's time to write the doc

+ 9 - 0
x-pack/plugin/ml/src/test/java/org/elasticsearch/xpack/ml/datafeed/extractor/aggregation/AggregationTestUtils.java

@@ -7,6 +7,7 @@ package org.elasticsearch.xpack.ml.datafeed.extractor.aggregation;
 
 import org.elasticsearch.search.aggregations.Aggregation;
 import org.elasticsearch.search.aggregations.Aggregations;
+import org.elasticsearch.search.aggregations.bucket.SingleBucketAggregation;
 import org.elasticsearch.search.aggregations.bucket.histogram.Histogram;
 import org.elasticsearch.search.aggregations.bucket.terms.StringTerms;
 import org.elasticsearch.search.aggregations.bucket.terms.Terms;
@@ -37,6 +38,14 @@ public final class AggregationTestUtils {
         return bucket;
     }
 
+    static SingleBucketAggregation createSingleBucketAgg(String name, long docCount, List<Aggregation> subAggregations) {
+        SingleBucketAggregation singleBucketAggregation = mock(SingleBucketAggregation.class);
+        when(singleBucketAggregation.getName()).thenReturn(name);
+        when(singleBucketAggregation.getDocCount()).thenReturn(docCount);
+        when(singleBucketAggregation.getAggregations()).thenReturn(createAggs(subAggregations));
+        return singleBucketAggregation;
+    }
+
     static Histogram.Bucket createHistogramBucket(long timestamp, long docCount) {
         return createHistogramBucket(timestamp, docCount, Collections.emptyList());
     }

+ 33 - 0
x-pack/plugin/ml/src/test/java/org/elasticsearch/xpack/ml/datafeed/extractor/aggregation/AggregationToJsonProcessorTests.java

@@ -31,6 +31,7 @@ import static org.elasticsearch.xpack.ml.datafeed.extractor.aggregation.Aggregat
 import static org.elasticsearch.xpack.ml.datafeed.extractor.aggregation.AggregationTestUtils.createHistogramBucket;
 import static org.elasticsearch.xpack.ml.datafeed.extractor.aggregation.AggregationTestUtils.createMax;
 import static org.elasticsearch.xpack.ml.datafeed.extractor.aggregation.AggregationTestUtils.createPercentiles;
+import static org.elasticsearch.xpack.ml.datafeed.extractor.aggregation.AggregationTestUtils.createSingleBucketAgg;
 import static org.elasticsearch.xpack.ml.datafeed.extractor.aggregation.AggregationTestUtils.createSingleValue;
 import static org.elasticsearch.xpack.ml.datafeed.extractor.aggregation.AggregationTestUtils.createTerms;
 import static org.hamcrest.Matchers.containsString;
@@ -439,6 +440,38 @@ public class AggregationToJsonProcessorTests extends ESTestCase {
                 "{\"time\":4000,\"my_field\":4.0,\"doc_count\":14}"));
     }
 
+    public void testSingleBucketAgg() throws IOException {
+        List<Histogram.Bucket> histogramBuckets = Arrays.asList(
+            createHistogramBucket(1000L, 4, Arrays.asList(
+                createMax("time", 1000),
+                createSingleBucketAgg("agg1", 3, Collections.singletonList(createMax("field1", 5.0))),
+                createSingleBucketAgg("agg2", 1, Collections.singletonList(createMax("field2", 3.0))))),
+            createHistogramBucket(2000L, 7, Arrays.asList(
+                createMax("time", 2000),
+                createSingleBucketAgg("agg2", 3, Collections.singletonList(createMax("field2", 1.0))),
+                createSingleBucketAgg("agg1", 4, Collections.singletonList(createMax("field1", 7.0))))));
+
+        String json = aggToString(Sets.newHashSet("field1", "field2"), histogramBuckets);
+
+        assertThat(json, equalTo("{\"time\":1000,\"field1\":5.0,\"field2\":3.0,\"doc_count\":4}" +
+            " {\"time\":2000,\"field2\":1.0,\"field1\":7.0,\"doc_count\":7}"));
+    }
+
+    public void testSingleBucketAgg_failureWithSubMultiBucket() throws IOException {
+
+        List<Histogram.Bucket> histogramBuckets = Collections.singletonList(
+            createHistogramBucket(1000L, 4, Arrays.asList(
+                createMax("time", 1000),
+                createSingleBucketAgg("agg1", 3,
+                    Arrays.asList(createHistogramAggregation("histo", Collections.emptyList()),createMax("field1", 5.0))),
+                createSingleBucketAgg("agg2", 1,
+                    Arrays.asList(createHistogramAggregation("histo", Collections.emptyList()),createMax("field1", 3.0))))));
+
+
+        expectThrows(IllegalArgumentException.class,
+            () -> aggToString(Sets.newHashSet("my_field"), histogramBuckets));
+    }
+
     private String aggToString(Set<String> fields, Histogram.Bucket bucket) throws IOException {
         return aggToString(fields, Collections.singletonList(bucket));
     }