Browse Source

[Transform] add support for terms agg in transforms (#56696)

This adds support for `terms` and `rare_terms` aggs in transforms. 

The default behavior is that the results are collapsed in the following manner:
`<AGG_NAME>.<BUCKET_NAME>.<SUBAGGS...>...`
Or if no sub aggs exist
`<AGG_NAME>.<BUCKET_NAME>.<_doc_count>`

The mapping is also defined as `flattened` by default. This is to avoid field explosion while still providing (limited) search and aggregation capabilities.
Benjamin Trent 5 years ago
parent
commit
fd812d2ada

+ 2 - 0
docs/reference/rest-api/common-parms.asciidoc

@@ -644,6 +644,8 @@ are supported:
 * <<search-aggregations-metrics-weight-avg-aggregation,Weighted average>>
 * <<search-aggregations-metrics-cardinality-aggregation,Cardinality>>
 * <<search-aggregations-bucket-filter-aggregation,Filter>>
+* <<search-aggregations-bucket-rare-terms-aggregation, Rare Terms>>
+* <<search-aggregations-bucket-terms-aggregation, Terms>>
 * <<search-aggregations-metrics-geobounds-aggregation,Geo bounds>>
 * <<search-aggregations-metrics-geocentroid-aggregation,Geo centroid>>
 * <<search-aggregations-metrics-max-aggregation,Max>>

+ 3 - 3
x-pack/plugin/src/test/resources/rest-api-spec/test/transform/preview_transforms.yml

@@ -265,12 +265,12 @@ setup:
               "group_by": {
                 "time": {"date_histogram": {"fixed_interval": "1h", "field": "time"}}},
               "aggs": {
-                "vals": {"terms": {"field":"airline"}}
+                "vals": {"significant_terms": {"field":"airline"}}
               }
             }
           }
   - do:
-      catch: /Unsupported aggregation type \[terms\]/
+      catch: /Unsupported aggregation type \[significant_terms\]/
       transform.preview_transform:
         body: >
           {
@@ -280,7 +280,7 @@ setup:
               "group_by": {
                 "time": {"date_histogram": {"fixed_interval": "1h", "field": "time"}}},
               "aggs": {
-                "vals": {"terms": {"field":"airline"}}
+                "vals": {"significant_terms": {"field":"airline"}}
               }
             }
           }

+ 94 - 1
x-pack/plugin/transform/qa/single-node-tests/src/test/java/org/elasticsearch/xpack/transform/integration/TransformPivotRestIT.java

@@ -18,6 +18,8 @@ import org.junit.Before;
 import java.io.IOException;
 import java.time.Instant;
 import java.util.Arrays;
+import java.util.Collections;
+import java.util.HashMap;
 import java.util.HashSet;
 import java.util.List;
 import java.util.Map;
@@ -25,11 +27,11 @@ import java.util.Set;
 
 import static org.elasticsearch.common.xcontent.XContentFactory.jsonBuilder;
 import static org.elasticsearch.xpack.core.security.authc.support.UsernamePasswordToken.basicAuthHeaderValue;
-import static org.hamcrest.CoreMatchers.is;
 import static org.hamcrest.Matchers.containsString;
 import static org.hamcrest.Matchers.equalTo;
 import static org.hamcrest.Matchers.greaterThan;
 import static org.hamcrest.Matchers.hasSize;
+import static org.hamcrest.Matchers.is;
 import static org.hamcrest.Matchers.lessThan;
 import static org.hamcrest.Matchers.not;
 import static org.hamcrest.Matchers.nullValue;
@@ -508,6 +510,97 @@ public class TransformPivotRestIT extends TransformRestTestCase {
         assertDateHistogramPivot(REVIEWS_DATE_NANO_INDEX_NAME);
     }
 
+    @SuppressWarnings("unchecked")
+    public void testPivotWithTermsAgg() throws Exception {
+        String transformId = "simple_terms_agg_pivot";
+        String transformIndex = "pivot_reviews_via_histogram_with_terms_agg";
+        setupDataAccessRole(DATA_ACCESS_ROLE, REVIEWS_INDEX_NAME, transformIndex);
+
+        final Request createTransformRequest = createRequestWithAuth(
+            "PUT",
+            getTransformEndpoint() + transformId,
+            BASIC_AUTH_VALUE_TRANSFORM_ADMIN_WITH_SOME_DATA_ACCESS
+        );
+
+        String config = "{"
+            + " \"source\": {\"index\":\""
+            + REVIEWS_INDEX_NAME
+            + "\"},"
+            + " \"dest\": {\"index\":\""
+            + transformIndex
+            + "\"},";
+
+        config += " \"pivot\": {"
+            + "   \"group_by\": {"
+            + "     \"every_2\": {"
+            + "       \"histogram\": {"
+            + "         \"interval\": 2,\"field\":\"stars\""
+            + " } } },"
+            + "   \"aggregations\": {"
+            + "     \"common_users\": {"
+            + "       \"terms\": {"
+            + "         \"field\": \"user_id\","
+            + "         \"size\": 2"
+            + "        },"
+            + "        \"aggs\" : {"
+            + "          \"common_businesses\": {"
+            + "            \"terms\": {"
+            + "              \"field\": \"business_id\","
+            + "              \"size\": 2"
+            + "         }}"
+            + "        } "
+            +"      },"
+            + "     \"rare_users\": {"
+            + "       \"rare_terms\": {"
+            + "         \"field\": \"user_id\""
+            + " } } } }"
+            + "}";
+
+        createTransformRequest.setJsonEntity(config);
+        Map<String, Object> createTransformResponse = entityAsMap(client().performRequest(createTransformRequest));
+        assertThat(createTransformResponse.get("acknowledged"), equalTo(Boolean.TRUE));
+
+        startAndWaitForTransform(transformId, transformIndex);
+        assertTrue(indexExists(transformIndex));
+
+        // we expect 3 documents as there shall be 5 unique star values and we are bucketing every 2 starting at 0
+        Map<String, Object> indexStats = getAsMap(transformIndex + "/_stats");
+        assertEquals(3, XContentMapValues.extractValue("_all.total.docs.count", indexStats));
+
+        // get and check some term results
+        Map<String, Object> searchResult = getAsMap(transformIndex + "/_search?q=every_2:2.0");
+
+        assertEquals(1, XContentMapValues.extractValue("hits.total.value", searchResult));
+        Map<String, Integer> commonUsers = (Map<String, Integer>) ((List<?>) XContentMapValues.extractValue(
+            "hits.hits._source.common_users",
+            searchResult
+        )).get(0);
+        Map<String, Integer> rareUsers = (Map<String, Integer>) ((List<?>) XContentMapValues.extractValue(
+            "hits.hits._source.rare_users",
+            searchResult
+        )).get(0);
+        assertThat(commonUsers, is(not(nullValue())));
+        assertThat(commonUsers, equalTo(new HashMap<>(){{
+            put("user_10",
+                Collections.singletonMap(
+                    "common_businesses",
+                    new HashMap<>(){{
+                        put("business_12", 6);
+                        put("business_9", 4);
+            }}));
+            put("user_0", Collections.singletonMap(
+                "common_businesses",
+                new HashMap<>(){{
+                    put("business_0", 35);
+            }}));
+        }}));
+        assertThat(rareUsers, is(not(nullValue())));
+        assertThat(rareUsers, equalTo(new HashMap<>(){{
+            put("user_5", 1);
+            put("user_12", 1);
+        }}));
+    }
+
     private void assertDateHistogramPivot(String indexName) throws Exception {
         String transformId = "simple_date_histogram_pivot_" + indexName;
         String transformIndex = "pivot_reviews_via_date_histogram_" + indexName;

+ 33 - 0
x-pack/plugin/transform/src/main/java/org/elasticsearch/xpack/transform/transforms/pivot/AggregationResultUtils.java

@@ -16,6 +16,7 @@ import org.elasticsearch.common.geo.parsers.ShapeParser;
 import org.elasticsearch.search.aggregations.Aggregation;
 import org.elasticsearch.search.aggregations.AggregationBuilder;
 import org.elasticsearch.search.aggregations.PipelineAggregationBuilder;
+import org.elasticsearch.search.aggregations.bucket.MultiBucketsAggregation;
 import org.elasticsearch.search.aggregations.bucket.SingleBucketAggregation;
 import org.elasticsearch.search.aggregations.bucket.composite.CompositeAggregation;
 import org.elasticsearch.search.aggregations.metrics.GeoBounds;
@@ -52,6 +53,7 @@ public final class AggregationResultUtils {
         tempMap.put(GeoBounds.class.getName(), new GeoBoundsAggExtractor());
         tempMap.put(Percentiles.class.getName(), new PercentilesAggExtractor());
         tempMap.put(SingleBucketAggregation.class.getName(), new SingleBucketAggExtractor());
+        tempMap.put(MultiBucketsAggregation.class.getName(), new MultiBucketsAggExtractor());
         TYPE_VALUE_EXTRACTOR_MAP = Collections.unmodifiableMap(tempMap);
     }
 
@@ -120,6 +122,8 @@ public final class AggregationResultUtils {
             return TYPE_VALUE_EXTRACTOR_MAP.get(Percentiles.class.getName());
         } else if (aggregation instanceof SingleBucketAggregation) {
             return TYPE_VALUE_EXTRACTOR_MAP.get(SingleBucketAggregation.class.getName());
+        } else if (aggregation instanceof MultiBucketsAggregation) {
+            return TYPE_VALUE_EXTRACTOR_MAP.get(MultiBucketsAggregation.class.getName());
         } else {
             // Execution should never reach this point!
             // Creating transforms with unsupported aggregations shall not be possible
@@ -246,6 +250,35 @@ public final class AggregationResultUtils {
         }
     }
 
+    static class MultiBucketsAggExtractor implements AggValueExtractor {
+        @Override
+        public Object value(Aggregation agg, Map<String, String> fieldTypeMap, String lookupFieldPrefix) {
+            MultiBucketsAggregation aggregation = (MultiBucketsAggregation) agg;
+
+            HashMap<String, Object> nested = new HashMap<>();
+
+            for (MultiBucketsAggregation.Bucket bucket : aggregation.getBuckets()) {
+                if (bucket.getAggregations().iterator().hasNext() == false) {
+                    nested.put(bucket.getKeyAsString(), bucket.getDocCount());
+                } else {
+                    HashMap<String, Object> nestedBucketObject = new HashMap<>();
+                    for (Aggregation subAgg : bucket.getAggregations()) {
+                        nestedBucketObject.put(
+                            subAgg.getName(),
+                            getExtractor(subAgg).value(
+                                subAgg,
+                                fieldTypeMap,
+                                lookupFieldPrefix.isEmpty() ? agg.getName() : lookupFieldPrefix + "." + agg.getName()
+                            )
+                        );
+                    }
+                    nested.put(bucket.getKeyAsString(), nestedBucketObject);
+                }
+            }
+            return nested;
+        }
+    }
+
     static class ScriptedMetricAggExtractor implements AggValueExtractor {
         @Override
         public Object value(Aggregation agg, Map<String, String> fieldTypeMap, String lookupFieldPrefix) {

+ 4 - 3
x-pack/plugin/transform/src/main/java/org/elasticsearch/xpack/transform/transforms/pivot/Aggregations.java

@@ -31,6 +31,7 @@ public final class Aggregations {
     private static final String SOURCE = "_source";
 
     public static final String FLOAT = "float";
+    public static final String FLATTENED = "flattened";
     public static final String SCALED_FLOAT = "scaled_float";
     public static final String DOUBLE = "double";
     public static final String LONG = "long";
@@ -69,14 +70,12 @@ public final class Aggregations {
         "nested",
         "percentile_ranks",
         "range",
-        "rare_terms",
         "reverse_nested",
         "sampler",
         "significant_terms", // https://github.com/elastic/elasticsearch/issues/51073
         "significant_text",
         "stats", // https://github.com/elastic/elasticsearch/issues/51925
         "string_stats", // https://github.com/elastic/elasticsearch/issues/51925
-        "terms", // https://github.com/elastic/elasticsearch/issues/51073
         "top_hits",
         "top_metrics", // https://github.com/elastic/elasticsearch/issues/52236
         "t_test" // https://github.com/elastic/elasticsearch/issues/54503
@@ -107,7 +106,9 @@ public final class Aggregations {
         BUCKET_SELECTOR("bucket_selector", DYNAMIC),
         BUCKET_SCRIPT("bucket_script", DYNAMIC),
         PERCENTILES("percentiles", DOUBLE),
-        FILTER("filter", LONG);
+        FILTER("filter", LONG),
+        TERMS("terms", FLATTENED),
+        RARE_TERMS("rare_terms", FLATTENED);
 
         private final String aggregationType;
         private final String targetMapping;

+ 10 - 0
x-pack/plugin/transform/src/test/java/org/elasticsearch/xpack/transform/transforms/pivot/AggregationsTests.java

@@ -90,6 +90,16 @@ public class AggregationsTests extends ESTestCase {
         assertEquals("long", Aggregations.resolveTargetMapping("filter", "long"));
         assertEquals("long", Aggregations.resolveTargetMapping("filter", "double"));
 
+        // terms
+        assertEquals("flattened", Aggregations.resolveTargetMapping("terms", null));
+        assertEquals("flattened", Aggregations.resolveTargetMapping("terms", "keyword"));
+        assertEquals("flattened", Aggregations.resolveTargetMapping("terms", "text"));
+
+        // rare_terms
+        assertEquals("flattened", Aggregations.resolveTargetMapping("rare_terms", null));
+        assertEquals("flattened", Aggregations.resolveTargetMapping("rare_terms", "text"));
+        assertEquals("flattened", Aggregations.resolveTargetMapping("rare_terms", "keyword"));
+
         // corner case: source type null
         assertEquals(null, Aggregations.resolveTargetMapping("min", null));
     }