Browse Source

[Transform] add support for geo_line aggregation in pivot function (#69299)

This commit adds support for the Gold+ licensed `geo_line` aggregation.

This aggregation takes a collection of `geo_point` values and constructs a line
according to some sort value. Adding to transforms allows users to create these
potentially expensive lines out of band of visualizations and then do additional aggs/queries
against the pivoted data. 

Examples would be:

"Do these daily user paths ever intersect?"
"Does this path enter and leave this area?"
Benjamin Trent 4 years ago
parent
commit
1438434b6c

+ 1 - 0
docs/reference/rest-api/common-parms.asciidoc

@@ -703,6 +703,7 @@ currently supported:
 * <<search-aggregations-bucket-filter-aggregation,Filter>>
 * <<search-aggregations-metrics-geobounds-aggregation,Geo bounds>>
 * <<search-aggregations-metrics-geocentroid-aggregation,Geo centroid>>
+* <<search-aggregations-metrics-geo-line,Geo line>>
 * <<search-aggregations-metrics-max-aggregation,Max>>
 * <<search-aggregations-metrics-median-absolute-deviation-aggregation,Median absolute deviation>>
 * <<search-aggregations-metrics-min-aggregation,Min>>

+ 23 - 0
x-pack/plugin/core/src/main/java/org/elasticsearch/xpack/core/spatial/search/aggregations/GeoShapeMetricAggregation.java

@@ -0,0 +1,23 @@
+/*
+ * Copyright Elasticsearch B.V. and/or licensed to Elasticsearch B.V. under one
+ * or more contributor license agreements. Licensed under the Elastic License
+ * 2.0; you may not use this file except in compliance with the Elastic License
+ * 2.0.
+ */
+
+package org.elasticsearch.xpack.core.spatial.search.aggregations;
+
+import java.util.Map;
+
+/**
+ * This interface provides a way for spatial aggs to easily provide appropriately formatted geoJSON geometry to describe their
+ * aggregated results.
+ */
+public interface GeoShapeMetricAggregation {
+    /**
+     * Provides the geometry calculated by the aggregation in an indexible format.
+     *
+     * @return geometry as a geoJSON object
+     */
+    Map<String, Object> geoJSONGeometry();
+}

+ 21 - 14
x-pack/plugin/spatial/src/main/java/org/elasticsearch/xpack/spatial/search/aggregations/InternalGeoLine.java

@@ -13,10 +13,12 @@ import org.elasticsearch.common.io.stream.StreamOutput;
 import org.elasticsearch.common.xcontent.XContentBuilder;
 import org.elasticsearch.search.aggregations.InternalAggregation;
 import org.elasticsearch.search.sort.SortOrder;
+import org.elasticsearch.xpack.core.spatial.search.aggregations.GeoShapeMetricAggregation;
 
 import java.io.IOException;
 import java.util.ArrayList;
 import java.util.Arrays;
+import java.util.HashMap;
 import java.util.List;
 import java.util.Map;
 import java.util.Objects;
@@ -24,7 +26,7 @@ import java.util.Objects;
 /**
  * A single line string representing a sorted sequence of geo-points
  */
-public class InternalGeoLine extends InternalAggregation {
+public class InternalGeoLine extends InternalAggregation implements GeoShapeMetricAggregation {
     private static final double SCALE = Math.pow(10, 6);
 
     private long[] line;
@@ -147,21 +149,9 @@ public class InternalGeoLine extends InternalAggregation {
 
     @Override
     public XContentBuilder doXContentBody(XContentBuilder builder, Params params) throws IOException {
-        final List<double[]> coordinates = new ArrayList<>();
-        for (int i = 0; i < line.length; i++) {
-            int x = (int) (line[i] >> 32);
-            int y = (int) line[i];
-            coordinates.add(new double[] {
-                roundDegrees(GeoEncodingUtils.decodeLongitude(x)),
-                roundDegrees(GeoEncodingUtils.decodeLatitude(y))
-            });
-        }
         builder
             .field("type", "Feature")
-            .startObject("geometry")
-                .field("type", "LineString")
-                .array("coordinates", coordinates.toArray())
-            .endObject()
+            .field("geometry", geoJSONGeometry())
             .startObject("properties")
                 .field("complete", isComplete());
         if (includeSorts) {
@@ -212,4 +202,21 @@ public class InternalGeoLine extends InternalAggregation {
             && Objects.equals(size, that.size);
 
     }
+
+    @Override
+    public Map<String, Object> geoJSONGeometry() {
+        final List<double[]> coordinates = new ArrayList<>();
+        for (int i = 0; i < line.length; i++) {
+            int x = (int) (line[i] >> 32);
+            int y = (int) line[i];
+            coordinates.add(new double[] {
+                roundDegrees(GeoEncodingUtils.decodeLongitude(x)),
+                roundDegrees(GeoEncodingUtils.decodeLatitude(y))
+            });
+        }
+        final Map<String, Object> geoJSON = new HashMap<>();
+        geoJSON.put("type", "LineString");
+        geoJSON.put("coordinates", coordinates.toArray());
+        return geoJSON;
+    }
 }

+ 1 - 0
x-pack/plugin/transform/build.gradle

@@ -14,6 +14,7 @@ dependencies {
   testImplementation(testArtifact(project(xpackModule('core'))))
   testImplementation project(path: xpackModule('analytics'))
   testImplementation project(path: ':modules:aggs-matrix-stats')
+  testImplementation project(path: xpackModule('spatial'))
 }
 
 addQaCheckDependencies()

+ 60 - 0
x-pack/plugin/transform/qa/single-node-tests/src/javaRestTest/java/org/elasticsearch/xpack/transform/integration/TransformPivotRestIT.java

@@ -31,6 +31,7 @@ import static org.elasticsearch.common.xcontent.XContentFactory.jsonBuilder;
 import static org.elasticsearch.xpack.core.security.authc.support.UsernamePasswordToken.basicAuthHeaderValue;
 import static org.hamcrest.Matchers.containsString;
 import static org.hamcrest.Matchers.equalTo;
+import static org.hamcrest.Matchers.hasEntry;
 import static org.hamcrest.Matchers.hasSize;
 import static org.hamcrest.Matchers.is;
 import static org.hamcrest.Matchers.not;
@@ -1241,6 +1242,65 @@ public class TransformPivotRestIT extends TransformRestTestCase {
         assertEquals((4 + 15), Double.valueOf(latlon[1]), 0.000001);
     }
 
+    @SuppressWarnings("unchecked")
+    public void testPivotWithGeoLineAgg() throws Exception {
+        String transformId = "geo_line_pivot";
+        String transformIndex = "geo_line_pivot_reviews";
+        setupDataAccessRole(DATA_ACCESS_ROLE, REVIEWS_INDEX_NAME, transformIndex);
+
+        final Request createTransformRequest = createRequestWithAuth(
+            "PUT",
+            getTransformEndpoint() + transformId,
+            BASIC_AUTH_VALUE_TRANSFORM_ADMIN_WITH_SOME_DATA_ACCESS
+        );
+
+        String config = "{"
+            + " \"source\": {\"index\":\""
+            + REVIEWS_INDEX_NAME
+            + "\"},"
+            + " \"dest\": {\"index\":\""
+            + transformIndex
+            + "\"},";
+
+        config += " \"pivot\": {"
+            + "   \"group_by\": {"
+            + "     \"reviewer\": {"
+            + "       \"terms\": {"
+            + "         \"field\": \"user_id\""
+            + " } } },"
+            + "   \"aggregations\": {"
+            + "     \"avg_rating\": {"
+            + "       \"avg\": {"
+            + "         \"field\": \"stars\""
+            + " } },"
+            + "     \"location\": {"
+            + "       \"geo_line\": {\"point\": {\"field\":\"location\"}, \"sort\": {\"field\": \"timestamp\"}}"
+            + " } } }"
+            + "}";
+
+        createTransformRequest.setJsonEntity(config);
+        Map<String, Object> createTransformResponse = entityAsMap(client().performRequest(createTransformRequest));
+        assertThat(createTransformResponse.get("acknowledged"), equalTo(Boolean.TRUE));
+
+        startAndWaitForTransform(transformId, transformIndex, BASIC_AUTH_VALUE_TRANSFORM_ADMIN_WITH_SOME_DATA_ACCESS);
+        assertTrue(indexExists(transformIndex));
+
+        // we expect 27 documents as there shall be 27 user_id's
+        Map<String, Object> indexStats = getAsMap(transformIndex + "/_stats");
+        assertEquals(27, XContentMapValues.extractValue("_all.total.docs.count", indexStats));
+
+        // get and check some users
+        Map<String, Object> searchResult = getAsMap(transformIndex + "/_search?q=reviewer:user_4");
+        assertEquals(1, XContentMapValues.extractValue("hits.total.value", searchResult));
+        Number actual = (Number) ((List<?>) XContentMapValues.extractValue("hits.hits._source.avg_rating", searchResult)).get(0);
+        assertEquals(3.878048780, actual.doubleValue(), 0.000001);
+        Map<String, Object> actualString = (Map<String, Object>) ((List<?>) XContentMapValues.extractValue(
+            "hits.hits._source.location",
+            searchResult
+        )).get(0);
+        assertThat(actualString, hasEntry("type", "LineString"));
+    }
+
     @SuppressWarnings("unchecked")
     public void testPivotWithGeotileGroupBy() throws Exception {
         String transformId = "geotile_grid_group_by";

+ 22 - 0
x-pack/plugin/transform/src/main/java/org/elasticsearch/xpack/transform/transforms/pivot/AggregationResultUtils.java

@@ -29,6 +29,7 @@ import org.elasticsearch.search.aggregations.metrics.NumericMetricsAggregation.S
 import org.elasticsearch.search.aggregations.metrics.Percentile;
 import org.elasticsearch.search.aggregations.metrics.Percentiles;
 import org.elasticsearch.search.aggregations.metrics.ScriptedMetric;
+import org.elasticsearch.xpack.core.spatial.search.aggregations.GeoShapeMetricAggregation;
 import org.elasticsearch.xpack.core.transform.TransformField;
 import org.elasticsearch.xpack.core.transform.transforms.TransformIndexerStats;
 import org.elasticsearch.xpack.core.transform.transforms.pivot.GeoTileGroupSource;
@@ -61,6 +62,7 @@ public final class AggregationResultUtils {
         tempMap.put(Percentiles.class.getName(), new PercentilesAggExtractor());
         tempMap.put(SingleBucketAggregation.class.getName(), new SingleBucketAggExtractor());
         tempMap.put(MultiBucketsAggregation.class.getName(), new MultiBucketsAggExtractor());
+        tempMap.put(GeoShapeMetricAggregation.class.getName(), new GeoShapeMetricAggExtractor());
         TYPE_VALUE_EXTRACTOR_MAP = Collections.unmodifiableMap(tempMap);
     }
 
@@ -155,6 +157,8 @@ public final class AggregationResultUtils {
             return TYPE_VALUE_EXTRACTOR_MAP.get(SingleBucketAggregation.class.getName());
         } else if (aggregation instanceof MultiBucketsAggregation) {
             return TYPE_VALUE_EXTRACTOR_MAP.get(MultiBucketsAggregation.class.getName());
+        } else if (aggregation instanceof GeoShapeMetricAggregation) {
+            return TYPE_VALUE_EXTRACTOR_MAP.get(GeoShapeMetricAggregation.class.getName());
         } else {
             // Execution should never reach this point!
             // Creating transforms with unsupported aggregations shall not be possible
@@ -210,6 +214,10 @@ public final class AggregationResultUtils {
         AggregationExtractionException(String msg, Object... args) {
             super(msg, args);
         }
+
+        AggregationExtractionException(String msg, Throwable cause, Object... args) {
+            super(msg, cause, args);
+        }
     }
 
     /**
@@ -390,6 +398,20 @@ public final class AggregationResultUtils {
         }
     }
 
+    static class GeoShapeMetricAggExtractor implements AggValueExtractor {
+
+        @Override
+        public Object value(Aggregation aggregation, Map<String, String> fieldTypeMap, String lookupFieldPrefix) {
+            assert aggregation instanceof GeoShapeMetricAggregation
+                 : "Unexpected type ["
+                        + aggregation.getClass().getName()
+                        + "] for aggregation ["
+                        + aggregation.getName()
+                        + "]";
+            return ((GeoShapeMetricAggregation) aggregation).geoJSONGeometry();
+        }
+    }
+
     static class GeoTileBucketKeyExtractor implements BucketKeyExtractor {
 
         @Override

+ 2 - 1
x-pack/plugin/transform/src/main/java/org/elasticsearch/xpack/transform/transforms/pivot/TransformAggregations.java

@@ -102,8 +102,9 @@ public final class TransformAggregations {
         MAX("max", SOURCE),
         MIN("min", SOURCE),
         SUM("sum", DOUBLE),
-        GEO_CENTROID("geo_centroid", GEO_POINT),
         GEO_BOUNDS("geo_bounds", GEO_SHAPE),
+        GEO_CENTROID("geo_centroid", GEO_POINT),
+        GEO_LINE("geo_line", GEO_SHAPE),
         SCRIPTED_METRIC("scripted_metric", DYNAMIC),
         WEIGHTED_AVG("weighted_avg", DOUBLE),
         BUCKET_SELECTOR("bucket_selector", DYNAMIC),

+ 18 - 2
x-pack/plugin/transform/src/test/java/org/elasticsearch/xpack/transform/transforms/pivot/PivotTests.java

@@ -26,6 +26,7 @@ import org.elasticsearch.common.xcontent.XContentParser;
 import org.elasticsearch.common.xcontent.XContentType;
 import org.elasticsearch.common.xcontent.json.JsonXContent;
 import org.elasticsearch.index.IndexNotFoundException;
+import org.elasticsearch.license.XPackLicenseState;
 import org.elasticsearch.search.SearchHit;
 import org.elasticsearch.search.SearchHits;
 import org.elasticsearch.search.SearchModule;
@@ -38,6 +39,7 @@ import org.elasticsearch.xpack.core.transform.transforms.pivot.AggregationConfig
 import org.elasticsearch.xpack.core.transform.transforms.pivot.GroupConfig;
 import org.elasticsearch.xpack.core.transform.transforms.pivot.GroupConfigTests;
 import org.elasticsearch.xpack.core.transform.transforms.pivot.PivotConfig;
+import org.elasticsearch.xpack.spatial.SpatialPlugin;
 import org.elasticsearch.xpack.transform.Transform;
 import org.elasticsearch.xpack.transform.transforms.Function;
 import org.elasticsearch.xpack.transform.transforms.pivot.TransformAggregations.AggregationType;
@@ -54,7 +56,6 @@ import java.util.concurrent.atomic.AtomicReference;
 import java.util.stream.Collectors;
 import java.util.stream.Stream;
 
-import static java.util.Collections.emptyList;
 import static org.hamcrest.CoreMatchers.instanceOf;
 import static org.hamcrest.CoreMatchers.is;
 import static org.hamcrest.Matchers.anyOf;
@@ -75,7 +76,7 @@ public class PivotTests extends ESTestCase {
     @Before
     public void registerAggregationNamedObjects() throws Exception {
         // register aggregations as NamedWriteable
-        SearchModule searchModule = new SearchModule(Settings.EMPTY, emptyList());
+        SearchModule searchModule = new SearchModule(Settings.EMPTY, List.of(new TestSpatialPlugin()));
         namedXContentRegistry = new NamedXContentRegistry(searchModule.getNamedXContents());
     }
 
@@ -305,6 +306,11 @@ public class PivotTests extends ESTestCase {
                 "{" + "\"pivot_filter\": {" + "  \"filter\": {" + "   \"term\": {\"field\": \"value\"}" + "  }" + "}" + "}"
             );
         }
+        if (agg.equals(AggregationType.GEO_LINE.getName())) {
+            return parseAggregations(
+                "{\"pivot_geo_line\": {\"geo_line\": {\"point\": {\"field\": \"values\"}, \"sort\":{\"field\": \"timestamp\"}}}}"
+            );
+        }
 
         return parseAggregations(
             "{\n" + "  \"pivot_" + agg + "\": {\n" + "    \"" + agg + "\": {\n" + "      \"field\": \"values\"\n" + "    }\n" + "  }" + "}"
@@ -345,4 +351,14 @@ public class PivotTests extends ESTestCase {
             fail("Expected config to be invalid");
         }
     }
+
+    // This is to pass license checks :)
+    private static class TestSpatialPlugin extends SpatialPlugin {
+
+        @Override
+        protected XPackLicenseState getLicenseState() {
+            return new XPackLicenseState(Settings.EMPTY, System::currentTimeMillis);
+        }
+
+    }
 }

+ 4 - 0
x-pack/plugin/transform/src/test/java/org/elasticsearch/xpack/transform/transforms/pivot/TransformAggregationsTests.java

@@ -70,6 +70,10 @@ public class TransformAggregationsTests extends ESTestCase {
         assertEquals("geo_shape", TransformAggregations.resolveTargetMapping("geo_bounds", "geo_shape"));
         assertEquals("geo_shape", TransformAggregations.resolveTargetMapping("geo_bounds", null));
 
+        // geo_line
+        assertEquals("geo_shape", TransformAggregations.resolveTargetMapping("geo_line", "geo_shape"));
+        assertEquals("geo_shape", TransformAggregations.resolveTargetMapping("geo_line", null));
+
         // scripted_metric
         assertEquals("_dynamic", TransformAggregations.resolveTargetMapping("scripted_metric", null));
         assertEquals("_dynamic", TransformAggregations.resolveTargetMapping("scripted_metric", "int"));