瀏覽代碼

[Transform] adds geotile_grid support in group_by (#56514)

This adds support for grouping by geo points. This uses the agg [geotile_grid](https://www.elastic.co/guide/en/elasticsearch/reference/current/search-aggregations-bucket-geotilegrid-aggregation.html).

I am opting to store the tile results of group_by as a `geo_shape` so that users can query the results. Additionally, the shapes could be visualized and filtered in the kibana maps app.

relates to https://github.com/elastic/elasticsearch/issues/56121
Benjamin Trent 5 年之前
父節點
當前提交
1c3e450c2e
共有 15 個文件被更改,包括 664 次插入16 次删除
  1. 126 0
      client/rest-high-level/src/main/java/org/elasticsearch/client/transform/transforms/pivot/GeoTileGroupSource.java
  2. 3 0
      client/rest-high-level/src/main/java/org/elasticsearch/client/transform/transforms/pivot/GroupConfig.java
  3. 2 1
      client/rest-high-level/src/main/java/org/elasticsearch/client/transform/transforms/pivot/SingleGroupSource.java
  4. 67 0
      client/rest-high-level/src/test/java/org/elasticsearch/client/transform/transforms/pivot/GeoTileGroupSourceTests.java
  5. 4 1
      client/rest-high-level/src/test/java/org/elasticsearch/client/transform/transforms/pivot/GroupConfigTests.java
  6. 72 0
      client/rest-high-level/src/test/java/org/elasticsearch/client/transform/transforms/pivot/hlrc/GeoTileGroupSourceTests.java
  7. 8 0
      server/src/main/java/org/elasticsearch/search/aggregations/bucket/geogrid/GeoTileUtils.java
  8. 193 0
      x-pack/plugin/core/src/main/java/org/elasticsearch/xpack/core/transform/transforms/pivot/GeoTileGroupSource.java
  9. 5 0
      x-pack/plugin/core/src/main/java/org/elasticsearch/xpack/core/transform/transforms/pivot/GroupConfig.java
  10. 23 1
      x-pack/plugin/core/src/main/java/org/elasticsearch/xpack/core/transform/transforms/pivot/SingleGroupSource.java
  11. 49 0
      x-pack/plugin/core/src/test/java/org/elasticsearch/xpack/core/transform/transforms/pivot/GeoTileGroupSourceTests.java
  12. 4 1
      x-pack/plugin/core/src/test/java/org/elasticsearch/xpack/core/transform/transforms/pivot/GroupConfigTests.java
  13. 68 0
      x-pack/plugin/transform/qa/single-node-tests/src/test/java/org/elasticsearch/xpack/transform/integration/TransformPivotRestIT.java
  14. 2 2
      x-pack/plugin/transform/src/main/java/org/elasticsearch/xpack/transform/transforms/pivot/AggregationResultUtils.java
  15. 38 10
      x-pack/plugin/transform/src/main/java/org/elasticsearch/xpack/transform/transforms/pivot/SchemaUtil.java

+ 126 - 0
client/rest-high-level/src/main/java/org/elasticsearch/client/transform/transforms/pivot/GeoTileGroupSource.java

@@ -0,0 +1,126 @@
+/*
+ * Licensed to Elasticsearch under one or more contributor
+ * license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright
+ * ownership. Elasticsearch licenses this file to you under
+ * the Apache License, Version 2.0 (the "License"); you may
+ * not use this file except in compliance with the License.
+ * You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied.  See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+
+package org.elasticsearch.client.transform.transforms.pivot;
+
+import org.elasticsearch.common.ParseField;
+import org.elasticsearch.common.geo.GeoBoundingBox;
+import org.elasticsearch.common.xcontent.ConstructingObjectParser;
+import org.elasticsearch.common.xcontent.ObjectParser;
+import org.elasticsearch.common.xcontent.ToXContentObject;
+import org.elasticsearch.common.xcontent.XContentBuilder;
+import org.elasticsearch.common.xcontent.XContentParser;
+import org.elasticsearch.search.aggregations.bucket.geogrid.GeoTileUtils;
+
+import java.io.IOException;
+import java.util.Objects;
+
+import static org.elasticsearch.common.xcontent.ConstructingObjectParser.optionalConstructorArg;
+
+/*
+ * A geotile_grid aggregation source for group_by
+ */
+public class GeoTileGroupSource extends SingleGroupSource implements ToXContentObject {
+    private static final String NAME = "transform_geo_tile_group";
+
+    private static final ParseField PRECISION = new ParseField("precision");
+    private static final ConstructingObjectParser<GeoTileGroupSource, Void> PARSER = new ConstructingObjectParser<>(NAME, true,  (args) -> {
+        String field = (String) args[0];
+        Integer precision = (Integer) args[1];
+        GeoBoundingBox boundingBox = (GeoBoundingBox) args[2];
+
+        return new GeoTileGroupSource(field, precision, boundingBox);
+    });
+
+    static {
+        PARSER.declareString(optionalConstructorArg(), FIELD);
+        PARSER.declareInt(optionalConstructorArg(), PRECISION);
+        PARSER.declareField(
+            optionalConstructorArg(),
+            (p, context) -> GeoBoundingBox.parseBoundingBox(p),
+            GeoBoundingBox.BOUNDS_FIELD,
+            ObjectParser.ValueType.OBJECT
+        );
+    }
+    private final Integer precision;
+    private final GeoBoundingBox geoBoundingBox;
+
+    public GeoTileGroupSource(final String field, final Integer precision, final GeoBoundingBox boundingBox) {
+        super(field, null);
+        if (precision != null) {
+            GeoTileUtils.checkPrecisionRange(precision);
+        }
+        this.precision = precision;
+        this.geoBoundingBox = boundingBox;
+    }
+
+    @Override
+    public Type getType() {
+        return Type.GEOTILE_GRID;
+    }
+
+    public Integer getPrecision() {
+        return precision;
+    }
+
+    public GeoBoundingBox getGeoBoundingBox() {
+        return geoBoundingBox;
+    }
+
+    public static GeoTileGroupSource fromXContent(final XContentParser parser) {
+        return PARSER.apply(parser, null);
+    }
+
+    @Override
+    public XContentBuilder toXContent(XContentBuilder builder, Params params) throws IOException {
+        builder.startObject();
+        super.innerXContent(builder, params);
+        if (precision != null) {
+            builder.field(PRECISION.getPreferredName(), precision);
+        }
+        if (geoBoundingBox != null) {
+            geoBoundingBox.toXContent(builder, params);
+        }
+        builder.endObject();
+        return builder;
+    }
+
+    @Override
+    public boolean equals(Object other) {
+        if (this == other) {
+            return true;
+        }
+
+        if (other == null || getClass() != other.getClass()) {
+            return false;
+        }
+
+        final GeoTileGroupSource that = (GeoTileGroupSource) other;
+
+        return Objects.equals(this.field, that.field)
+            && Objects.equals(this.precision, that.precision)
+            && Objects.equals(this.geoBoundingBox, that.geoBoundingBox);
+    }
+
+    @Override
+    public int hashCode() {
+        return Objects.hash(field, precision, geoBoundingBox);
+    }
+
+}

+ 3 - 0
client/rest-high-level/src/main/java/org/elasticsearch/client/transform/transforms/pivot/GroupConfig.java

@@ -97,6 +97,9 @@ public class GroupConfig implements ToXContentObject {
                 case "date_histogram":
                     groupSource = DateHistogramGroupSource.fromXContent(parser);
                     break;
+                case "geotile_grid":
+                    groupSource = GeoTileGroupSource.fromXContent(parser);
+                    break;
                 default:
                     // not a valid group source. Consume up to the dest field end object
                     consumeUntilEndObject(parser, 2);

+ 2 - 1
client/rest-high-level/src/main/java/org/elasticsearch/client/transform/transforms/pivot/SingleGroupSource.java

@@ -36,7 +36,8 @@ public abstract class SingleGroupSource implements ToXContentObject {
     public enum Type {
         TERMS,
         HISTOGRAM,
-        DATE_HISTOGRAM;
+        DATE_HISTOGRAM,
+        GEOTILE_GRID;
 
         public String value() {
             return name().toLowerCase(Locale.ROOT);

+ 67 - 0
client/rest-high-level/src/test/java/org/elasticsearch/client/transform/transforms/pivot/GeoTileGroupSourceTests.java

@@ -0,0 +1,67 @@
+/*
+ * Licensed to Elasticsearch under one or more contributor
+ * license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright
+ * ownership. Elasticsearch licenses this file to you under
+ * the Apache License, Version 2.0 (the "License"); you may
+ * not use this file except in compliance with the License.
+ * You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied.  See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+
+package org.elasticsearch.client.transform.transforms.pivot;
+
+import org.elasticsearch.common.geo.GeoBoundingBox;
+import org.elasticsearch.common.geo.GeoPoint;
+import org.elasticsearch.common.xcontent.XContentParser;
+import org.elasticsearch.geo.GeometryTestUtils;
+import org.elasticsearch.geometry.Rectangle;
+import org.elasticsearch.search.aggregations.bucket.geogrid.GeoTileUtils;
+import org.elasticsearch.test.AbstractXContentTestCase;
+
+import java.io.IOException;
+import java.util.function.Predicate;
+
+public class GeoTileGroupSourceTests extends AbstractXContentTestCase<GeoTileGroupSource> {
+
+    public static GeoTileGroupSource randomGeoTileGroupSource() {
+        Rectangle rectangle = GeometryTestUtils.randomRectangle();
+        return new GeoTileGroupSource(
+            randomBoolean() ? null : randomAlphaOfLength(10),
+            randomBoolean() ? null : randomIntBetween(1, GeoTileUtils.MAX_ZOOM),
+            randomBoolean() ? null : new GeoBoundingBox(
+                new GeoPoint(rectangle.getMaxLat(), rectangle.getMinLon()),
+                new GeoPoint(rectangle.getMinLat(), rectangle.getMaxLon())
+            )
+        );
+    }
+
+    @Override
+    protected GeoTileGroupSource createTestInstance() {
+        return randomGeoTileGroupSource();
+    }
+
+    @Override
+    protected GeoTileGroupSource doParseInstance(XContentParser parser) throws IOException {
+        return GeoTileGroupSource.fromXContent(parser);
+    }
+
+    @Override
+    protected boolean supportsUnknownFields() {
+        return true;
+    }
+
+    @Override
+    protected Predicate<String> getRandomFieldsExcludeFilter() {
+        // allow unknown fields in the root of the object only
+        return field -> !field.isEmpty();
+    }
+}

+ 4 - 1
client/rest-high-level/src/test/java/org/elasticsearch/client/transform/transforms/pivot/GroupConfigTests.java

@@ -55,8 +55,11 @@ public class GroupConfigTests extends AbstractXContentTestCase<GroupConfig> {
                         groupBy = HistogramGroupSourceTests.randomHistogramGroupSource();
                         break;
                     case DATE_HISTOGRAM:
-                    default:
                         groupBy = DateHistogramGroupSourceTests.randomDateHistogramGroupSource();
+                        break;
+                    case GEOTILE_GRID:
+                    default:
+                        groupBy = GeoTileGroupSourceTests.randomGeoTileGroupSource();
                 }
                 groups.put(targetFieldName, groupBy);
             }

+ 72 - 0
client/rest-high-level/src/test/java/org/elasticsearch/client/transform/transforms/pivot/hlrc/GeoTileGroupSourceTests.java

@@ -0,0 +1,72 @@
+/*
+ * Licensed to Elasticsearch under one or more contributor
+ * license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright
+ * ownership. Elasticsearch licenses this file to you under
+ * the Apache License, Version 2.0 (the "License"); you may
+ * not use this file except in compliance with the License.
+ * You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied.  See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+
+package org.elasticsearch.client.transform.transforms.pivot.hlrc;
+
+import org.elasticsearch.client.AbstractResponseTestCase;
+import org.elasticsearch.common.geo.GeoBoundingBox;
+import org.elasticsearch.common.geo.GeoPoint;
+import org.elasticsearch.common.xcontent.XContentParser;
+import org.elasticsearch.common.xcontent.XContentType;
+import org.elasticsearch.geo.GeometryTestUtils;
+import org.elasticsearch.geometry.Rectangle;
+import org.elasticsearch.search.aggregations.bucket.geogrid.GeoTileUtils;
+import org.elasticsearch.xpack.core.transform.transforms.pivot.GeoTileGroupSource;
+
+
+import static org.hamcrest.Matchers.equalTo;
+
+public class GeoTileGroupSourceTests extends AbstractResponseTestCase<
+    GeoTileGroupSource,
+    org.elasticsearch.client.transform.transforms.pivot.GeoTileGroupSource> {
+
+    public static GeoTileGroupSource randomGeoTileGroupSource() {
+        Rectangle rectangle = GeometryTestUtils.randomRectangle();
+        return new GeoTileGroupSource(
+            randomBoolean() ? null : randomAlphaOfLength(10),
+            randomBoolean() ? null : randomIntBetween(1, GeoTileUtils.MAX_ZOOM),
+            randomBoolean() ? null : new GeoBoundingBox(
+                new GeoPoint(rectangle.getMaxLat(), rectangle.getMinLon()),
+                new GeoPoint(rectangle.getMinLat(), rectangle.getMaxLon())
+            )
+        );
+    }
+
+    @Override
+    protected GeoTileGroupSource createServerTestInstance(XContentType xContentType) {
+        return randomGeoTileGroupSource();
+    }
+
+    @Override
+    protected org.elasticsearch.client.transform.transforms.pivot.GeoTileGroupSource doParseToClientInstance(XContentParser parser) {
+        return org.elasticsearch.client.transform.transforms.pivot.GeoTileGroupSource.fromXContent(parser);
+    }
+
+    @Override
+    protected void assertInstances(
+        GeoTileGroupSource serverTestInstance,
+        org.elasticsearch.client.transform.transforms.pivot.GeoTileGroupSource clientInstance
+    ) {
+        assertThat(serverTestInstance.getField(), equalTo(clientInstance.getField()));
+        assertNull(clientInstance.getScript());
+        assertThat(serverTestInstance.getPrecision(), equalTo(clientInstance.getPrecision()));
+        assertThat(serverTestInstance.getGeoBoundingBox(), equalTo(clientInstance.getGeoBoundingBox()));
+    }
+
+}

+ 8 - 0
server/src/main/java/org/elasticsearch/search/aggregations/bucket/geogrid/GeoTileUtils.java

@@ -251,6 +251,14 @@ public final class GeoTileUtils {
         return toBoundingBox(hashAsInts[1], hashAsInts[2], hashAsInts[0]);
     }
 
+    /**
+     * Decode a string bucket key in "zoom/x/y" format to a bounding box of the tile corners
+     */
+    public static Rectangle toBoundingBox(String hash) {
+        int[] hashAsInts = parseHash(hash);
+        return toBoundingBox(hashAsInts[1], hashAsInts[2], hashAsInts[0]);
+    }
+
     public static Rectangle toBoundingBox(int xTile, int yTile, int precision) {
         final double tiles = validateZXY(precision, xTile, yTile);
         final double minN = Math.PI - (2.0 * Math.PI * (yTile + 1)) / tiles;

+ 193 - 0
x-pack/plugin/core/src/main/java/org/elasticsearch/xpack/core/transform/transforms/pivot/GeoTileGroupSource.java

@@ -0,0 +1,193 @@
+/*
+ * Copyright Elasticsearch B.V. and/or licensed to Elasticsearch B.V. under one
+ * or more contributor license agreements. Licensed under the Elastic License;
+ * you may not use this file except in compliance with the Elastic License.
+ */
+
+package org.elasticsearch.xpack.core.transform.transforms.pivot;
+
+import org.elasticsearch.common.ParseField;
+import org.elasticsearch.common.geo.GeoBoundingBox;
+import org.elasticsearch.common.geo.GeoPoint;
+import org.elasticsearch.common.geo.parsers.ShapeParser;
+import org.elasticsearch.common.io.stream.StreamInput;
+import org.elasticsearch.common.io.stream.StreamOutput;
+import org.elasticsearch.common.xcontent.ConstructingObjectParser;
+import org.elasticsearch.common.xcontent.ObjectParser;
+import org.elasticsearch.common.xcontent.XContentBuilder;
+import org.elasticsearch.common.xcontent.XContentParser;
+import org.elasticsearch.geometry.Rectangle;
+import org.elasticsearch.index.mapper.GeoShapeFieldMapper;
+import org.elasticsearch.index.query.BoolQueryBuilder;
+import org.elasticsearch.index.query.GeoBoundingBoxQueryBuilder;
+import org.elasticsearch.index.query.QueryBuilder;
+import org.elasticsearch.index.query.QueryBuilders;
+import org.elasticsearch.search.aggregations.bucket.geogrid.GeoTileUtils;
+
+import java.io.IOException;
+import java.util.Arrays;
+import java.util.HashMap;
+import java.util.Map;
+import java.util.Objects;
+import java.util.Set;
+
+import static org.elasticsearch.common.xcontent.ConstructingObjectParser.optionalConstructorArg;
+
+/*
+ * A geotile_grid aggregation source for group_by
+ */
+public class GeoTileGroupSource extends SingleGroupSource {
+    private static final String NAME = "transform_geo_tile_group";
+
+    private static final ParseField PRECISION = new ParseField("precision");
+    private static final ConstructingObjectParser<GeoTileGroupSource, Void> STRICT_PARSER = createParser(false);
+
+    private static final ConstructingObjectParser<GeoTileGroupSource, Void> LENIENT_PARSER = createParser(true);
+
+    private static ConstructingObjectParser<GeoTileGroupSource, Void> createParser(boolean lenient) {
+        ConstructingObjectParser<GeoTileGroupSource, Void> parser = new ConstructingObjectParser<>(NAME, lenient, (args) -> {
+            String field = (String) args[0];
+            Integer precision = (Integer) args[1];
+            GeoBoundingBox boundingBox = (GeoBoundingBox) args[2];
+
+            return new GeoTileGroupSource(field, precision, boundingBox);
+        });
+        parser.declareString(optionalConstructorArg(), FIELD);
+        parser.declareInt(optionalConstructorArg(), PRECISION);
+        parser.declareField(
+            optionalConstructorArg(),
+            (p, context) -> GeoBoundingBox.parseBoundingBox(p),
+            GeoBoundingBox.BOUNDS_FIELD,
+            ObjectParser.ValueType.OBJECT
+        );
+        return parser;
+    }
+    private final Integer precision;
+    private final GeoBoundingBox geoBoundingBox;
+
+    public GeoTileGroupSource(final String field, final Integer precision, final GeoBoundingBox boundingBox) {
+        super(field, null);
+        if (precision != null) {
+            GeoTileUtils.checkPrecisionRange(precision);
+        }
+        this.precision = precision;
+        this.geoBoundingBox = boundingBox;
+    }
+
+    public GeoTileGroupSource(StreamInput in) throws IOException {
+        super(in);
+        precision = in.readOptionalVInt();
+        geoBoundingBox = in.readOptionalWriteable(GeoBoundingBox::new);
+    }
+
+    @Override
+    public Type getType() {
+        return Type.GEOTILE_GRID;
+    }
+
+    @Override
+    public void writeTo(StreamOutput out) throws IOException {
+        super.writeTo(out);
+        out.writeOptionalVInt(precision);
+        out.writeOptionalWriteable(geoBoundingBox);
+    }
+
+    public Integer getPrecision() {
+        return precision;
+    }
+
+    public GeoBoundingBox getGeoBoundingBox() {
+        return geoBoundingBox;
+    }
+
+    public static GeoTileGroupSource fromXContent(final XContentParser parser, boolean lenient) {
+        return lenient ? LENIENT_PARSER.apply(parser, null) : STRICT_PARSER.apply(parser, null);
+    }
+
+    @Override
+    public QueryBuilder getIncrementalBucketUpdateFilterQuery(
+        Set<String> changedBuckets,
+        String synchronizationField,
+        long synchronizationTimestamp
+    ) {
+        if (changedBuckets != null && changedBuckets.isEmpty() == false) {
+            BoolQueryBuilder boolQueryBuilder = QueryBuilders.boolQuery();
+            changedBuckets.stream()
+                .map(GeoTileUtils::toBoundingBox)
+                .map(this::toGeoQuery)
+                .forEach(boolQueryBuilder::should);
+            return boolQueryBuilder;
+        }
+        return null;
+    }
+
+    @Override
+    public boolean supportsIncrementalBucketUpdate() {
+        return true;
+    }
+
+    @Override
+    public XContentBuilder toXContent(XContentBuilder builder, Params params) throws IOException {
+        builder.startObject();
+        super.innerXContent(builder, params);
+        if (precision != null) {
+            builder.field(PRECISION.getPreferredName(), precision);
+        }
+        if (geoBoundingBox != null) {
+            geoBoundingBox.toXContent(builder, params);
+        }
+        builder.endObject();
+        return builder;
+    }
+
+    @Override
+    public boolean equals(Object other) {
+        if (this == other) {
+            return true;
+        }
+
+        if (other == null || getClass() != other.getClass()) {
+            return false;
+        }
+
+        final GeoTileGroupSource that = (GeoTileGroupSource) other;
+
+        return Objects.equals(this.field, that.field)
+            && Objects.equals(this.precision, that.precision)
+            && Objects.equals(this.geoBoundingBox, that.geoBoundingBox);
+    }
+
+    @Override
+    public int hashCode() {
+        return Objects.hash(field, precision, geoBoundingBox);
+    }
+
+    @Override
+    public String getMappingType() {
+        return GeoShapeFieldMapper.CONTENT_TYPE;
+    }
+
+    @Override
+    public Object transformBucketKey(Object key) {
+        assert key instanceof String;
+        Rectangle rectangle = GeoTileUtils.toBoundingBox(key.toString());
+        final Map<String, Object> geoShape = new HashMap<>();
+        geoShape.put(ShapeParser.FIELD_TYPE.getPreferredName(), "BBOX");
+        geoShape.put(
+            ShapeParser.FIELD_COORDINATES.getPreferredName(),
+            Arrays.asList(
+                new Double[] { rectangle.getMinLon(), rectangle.getMaxLat() },
+                new Double[] { rectangle.getMaxLon(), rectangle.getMinLat() }
+            )
+        );
+        return geoShape;
+    }
+
+    private GeoBoundingBoxQueryBuilder toGeoQuery(Rectangle rectangle) {
+        return QueryBuilders.geoBoundingBoxQuery(field)
+            .setCorners(
+                new GeoPoint(rectangle.getMaxLat(), rectangle.getMinLon()),
+                new GeoPoint(rectangle.getMinLat(), rectangle.getMaxLon())
+            );
+    }
+}

+ 5 - 0
x-pack/plugin/core/src/main/java/org/elasticsearch/xpack/core/transform/transforms/pivot/GroupConfig.java

@@ -60,6 +60,8 @@ public class GroupConfig implements Writeable, ToXContentObject {
                 return new HistogramGroupSource(stream);
             case DATE_HISTOGRAM:
                 return new DateHistogramGroupSource(stream);
+            case GEOTILE_GRID:
+                return new GeoTileGroupSource(stream);
             default:
                 throw new IOException("Unknown group type");
             }
@@ -177,6 +179,9 @@ public class GroupConfig implements Writeable, ToXContentObject {
             case DATE_HISTOGRAM:
                 groupSource = DateHistogramGroupSource.fromXContent(parser, lenient);
                 break;
+            case GEOTILE_GRID:
+                groupSource = GeoTileGroupSource.fromXContent(parser, lenient);
+                break;
             default:
                 throw new ParsingException(parser.getTokenLocation(), "invalid grouping type: " + groupType);
             }

+ 23 - 1
x-pack/plugin/core/src/main/java/org/elasticsearch/xpack/core/transform/transforms/pivot/SingleGroupSource.java

@@ -7,6 +7,7 @@
 package org.elasticsearch.xpack.core.transform.transforms.pivot;
 
 import org.elasticsearch.Version;
+import org.elasticsearch.common.Nullable;
 import org.elasticsearch.common.ParseField;
 import org.elasticsearch.common.Strings;
 import org.elasticsearch.common.io.stream.StreamInput;
@@ -32,7 +33,8 @@ public abstract class SingleGroupSource implements Writeable, ToXContentObject {
     public enum Type {
         TERMS(0),
         HISTOGRAM(1),
-        DATE_HISTOGRAM(2);
+        DATE_HISTOGRAM(2),
+        GEOTILE_GRID(3);
 
         private final byte id;
 
@@ -52,6 +54,8 @@ public abstract class SingleGroupSource implements Writeable, ToXContentObject {
                     return HISTOGRAM;
                 case 2:
                     return DATE_HISTOGRAM;
+                case 3:
+                    return GEOTILE_GRID;
                 default:
                     throw new IllegalArgumentException("unknown type");
             }
@@ -154,4 +158,22 @@ public abstract class SingleGroupSource implements Writeable, ToXContentObject {
     public String toString() {
         return Strings.toString(this, true, true);
     }
+
+    /**
+     * @return The preferred mapping type if it exists. Is nullable.
+     */
+    @Nullable
+    public String getMappingType() {
+        return null;
+    }
+
+    /**
+     * This will transform a composite aggregation bucket key into the desired format for indexing.
+     *
+     * @param key The bucket key for this group source
+     * @return the transformed bucket key for indexing
+     */
+    public Object transformBucketKey(Object key) {
+        return key;
+    }
 }

+ 49 - 0
x-pack/plugin/core/src/test/java/org/elasticsearch/xpack/core/transform/transforms/pivot/GeoTileGroupSourceTests.java

@@ -0,0 +1,49 @@
+/*
+ * Copyright Elasticsearch B.V. and/or licensed to Elasticsearch B.V. under one
+ * or more contributor license agreements. Licensed under the Elastic License;
+ * you may not use this file except in compliance with the Elastic License.
+ */
+
+package org.elasticsearch.xpack.core.transform.transforms.pivot;
+
+import org.elasticsearch.common.geo.GeoBoundingBox;
+import org.elasticsearch.common.geo.GeoPoint;
+import org.elasticsearch.common.io.stream.Writeable.Reader;
+import org.elasticsearch.common.xcontent.XContentParser;
+import org.elasticsearch.geo.GeometryTestUtils;
+import org.elasticsearch.geometry.Rectangle;
+import org.elasticsearch.search.aggregations.bucket.geogrid.GeoTileUtils;
+import org.elasticsearch.test.AbstractSerializingTestCase;
+
+import java.io.IOException;
+
+public class GeoTileGroupSourceTests extends AbstractSerializingTestCase<GeoTileGroupSource> {
+
+    public static GeoTileGroupSource randomGeoTileGroupSource() {
+        Rectangle rectangle = GeometryTestUtils.randomRectangle();
+        return new GeoTileGroupSource(
+            randomBoolean() ? null : randomAlphaOfLength(10),
+            randomBoolean() ? null : randomIntBetween(1, GeoTileUtils.MAX_ZOOM),
+            randomBoolean() ? null : new GeoBoundingBox(
+                new GeoPoint(rectangle.getMaxLat(), rectangle.getMinLon()),
+                new GeoPoint(rectangle.getMinLat(), rectangle.getMaxLon())
+            )
+        );
+    }
+
+    @Override
+    protected GeoTileGroupSource doParseInstance(XContentParser parser) throws IOException {
+        return GeoTileGroupSource.fromXContent(parser, false);
+    }
+
+    @Override
+    protected GeoTileGroupSource createTestInstance() {
+        return randomGeoTileGroupSource();
+    }
+
+    @Override
+    protected Reader<GeoTileGroupSource> instanceReader() {
+        return GeoTileGroupSource::new;
+    }
+
+}

+ 4 - 1
x-pack/plugin/core/src/test/java/org/elasticsearch/xpack/core/transform/transforms/pivot/GroupConfigTests.java

@@ -50,8 +50,11 @@ public class GroupConfigTests extends AbstractSerializingTestCase<GroupConfig> {
                     groupBy = HistogramGroupSourceTests.randomHistogramGroupSource();
                     break;
                 case DATE_HISTOGRAM:
-                default:
                     groupBy = DateHistogramGroupSourceTests.randomDateHistogramGroupSource();
+                    break;
+                case GEOTILE_GRID:
+                default:
+                    groupBy = GeoTileGroupSourceTests.randomGeoTileGroupSource();
                 }
 
                 source.put(targetFieldName, Collections.singletonMap(type.value(), getSource(groupBy)));

+ 68 - 0
x-pack/plugin/transform/qa/single-node-tests/src/test/java/org/elasticsearch/xpack/transform/integration/TransformPivotRestIT.java

@@ -25,10 +25,14 @@ import java.util.Set;
 
 import static org.elasticsearch.common.xcontent.XContentFactory.jsonBuilder;
 import static org.elasticsearch.xpack.core.security.authc.support.UsernamePasswordToken.basicAuthHeaderValue;
+import static org.hamcrest.CoreMatchers.is;
 import static org.hamcrest.Matchers.containsString;
 import static org.hamcrest.Matchers.equalTo;
 import static org.hamcrest.Matchers.greaterThan;
+import static org.hamcrest.Matchers.hasSize;
 import static org.hamcrest.Matchers.lessThan;
+import static org.hamcrest.Matchers.not;
+import static org.hamcrest.Matchers.nullValue;
 
 public class TransformPivotRestIT extends TransformRestTestCase {
 
@@ -930,6 +934,70 @@ public class TransformPivotRestIT extends TransformRestTestCase {
         assertEquals((4 + 15), Double.valueOf(latlon[1]), 0.000001);
     }
 
+    @SuppressWarnings("unchecked")
+    public void testPivotWithGeotileGroupBy() throws Exception {
+        String transformId = "geotile_grid_group_by";
+        String transformIndex = "geotile_grid_pivot_reviews";
+        setupDataAccessRole(DATA_ACCESS_ROLE, REVIEWS_INDEX_NAME, transformIndex);
+
+        final Request createTransformRequest = createRequestWithAuth(
+            "PUT",
+            getTransformEndpoint() + transformId,
+            BASIC_AUTH_VALUE_TRANSFORM_ADMIN_WITH_SOME_DATA_ACCESS
+        );
+
+        String config = "{"
+            + " \"source\": {\"index\":\""
+            + REVIEWS_INDEX_NAME
+            + "\"},"
+            + " \"dest\": {\"index\":\""
+            + transformIndex
+            + "\"},";
+
+        config += " \"pivot\": {"
+            + "   \"group_by\": {"
+            + "     \"tile\": {"
+            + "       \"geotile_grid\": {"
+            + "         \"field\": \"location\","
+            + "         \"precision\": 12"
+            + " } } },"
+            + "   \"aggregations\": {"
+            + "     \"avg_rating\": {"
+            + "       \"avg\": {"
+            + "         \"field\": \"stars\""
+            + " } },"
+            + "     \"boundary\": {"
+            + "       \"geo_bounds\": {\"field\": \"location\"}"
+            + " } } }"
+            + "}";
+
+        createTransformRequest.setJsonEntity(config);
+        Map<String, Object> createTransformResponse = entityAsMap(client().performRequest(createTransformRequest));
+        assertThat(createTransformResponse.get("acknowledged"), equalTo(Boolean.TRUE));
+
+        startAndWaitForTransform(transformId, transformIndex, BASIC_AUTH_VALUE_TRANSFORM_ADMIN_WITH_SOME_DATA_ACCESS);
+        assertTrue(indexExists(transformIndex));
+
+        // we expect 27 documents as there are that many tiles at this zoom
+        Map<String, Object> indexStats = getAsMap(transformIndex + "/_stats");
+        assertEquals(27, XContentMapValues.extractValue("_all.total.docs.count", indexStats));
+
+        // Verify that the format is sane for the geo grid
+        Map<String, Object> searchResult = getAsMap(transformIndex + "/_search?size=1");
+        Number actual = (Number) ((List<?>) XContentMapValues.extractValue("hits.hits._source.avg_rating", searchResult)).get(0);
+        assertThat(actual, is(not(nullValue())));
+        Map<String, Object> actualObj = (Map<String, Object>) ((List<?>) XContentMapValues.extractValue(
+            "hits.hits._source.tile",
+            searchResult
+        )).get(0);
+        assertThat(actualObj.get("type"), equalTo("BBOX"));
+        List<List<Double>> coordinates = (List<List<Double>>) actualObj.get("coordinates");
+        assertThat(coordinates, is(not(nullValue())));
+        assertThat(coordinates, hasSize(2));
+        assertThat(coordinates.get(0), hasSize(2));
+        assertThat(coordinates.get(1), hasSize(2));
+    }
+
     public void testPivotWithWeightedAvgAgg() throws Exception {
         String transformId = "weighted_avg_agg_transform";
         String transformIndex = "weighted_avg_pivot_reviews";

+ 2 - 2
x-pack/plugin/transform/src/main/java/org/elasticsearch/xpack/transform/transforms/pivot/AggregationResultUtils.java

@@ -81,10 +81,10 @@ public final class AggregationResultUtils {
             // - update documents
             IDGenerator idGen = new IDGenerator();
 
-            groups.getGroups().keySet().forEach(destinationFieldName -> {
+            groups.getGroups().forEach((destinationFieldName, singleGroupSource) -> {
                 Object value = bucket.getKey().get(destinationFieldName);
                 idGen.add(destinationFieldName, value);
-                updateDocument(document, destinationFieldName, value);
+                updateDocument(document, destinationFieldName, singleGroupSource.transformBucketKey(value));
             });
 
             List<String> aggNames = aggregationBuilders.stream().map(AggregationBuilder::getName).collect(Collectors.toList());

+ 38 - 10
x-pack/plugin/transform/src/main/java/org/elasticsearch/xpack/transform/transforms/pivot/SchemaUtil.java

@@ -8,6 +8,7 @@ package org.elasticsearch.xpack.transform.transforms.pivot;
 
 import org.apache.logging.log4j.LogManager;
 import org.apache.logging.log4j.Logger;
+import org.apache.logging.log4j.message.ParameterizedMessage;
 import org.elasticsearch.action.ActionListener;
 import org.elasticsearch.action.fieldcaps.FieldCapabilitiesAction;
 import org.elasticsearch.action.fieldcaps.FieldCapabilitiesRequest;
@@ -15,6 +16,7 @@ import org.elasticsearch.action.fieldcaps.FieldCapabilitiesResponse;
 import org.elasticsearch.action.support.IndicesOptions;
 import org.elasticsearch.client.Client;
 import org.elasticsearch.common.collect.Tuple;
+import org.elasticsearch.index.mapper.KeywordFieldMapper;
 import org.elasticsearch.index.mapper.NumberFieldMapper;
 import org.elasticsearch.search.aggregations.AggregationBuilder;
 import org.elasticsearch.search.aggregations.PipelineAggregationBuilder;
@@ -68,10 +70,20 @@ public final class SchemaUtil {
         Map<String, String> aggregationTypes = new HashMap<>();
         // collects the fieldnames and target fieldnames used for grouping
         Map<String, String> fieldNamesForGrouping = new HashMap<>();
+        // collects the target mapping types used for grouping
+        Map<String, String> fieldTypesForGrouping = new HashMap<>();
 
         config.getGroupConfig()
             .getGroups()
-            .forEach((destinationFieldName, group) -> { fieldNamesForGrouping.put(destinationFieldName, group.getField()); });
+            .forEach((destinationFieldName, group) -> {
+                // We will always need the field name for the grouping to create the mapping
+                fieldNamesForGrouping.put(destinationFieldName, group.getField());
+                // Sometimes the group config will supply a desired mapping as well
+                if (group.getMappingType() != null) {
+                    fieldTypesForGrouping.put(destinationFieldName, group.getMappingType());
+                }
+            });
+
 
         for (AggregationBuilder agg : config.getAggregationConfig().getAggregatorFactories()) {
             Tuple<Map<String, String>, Map<String, String>> inputAndOutputTypes = Aggregations.getAggregationInputAndOutputTypes(agg);
@@ -95,7 +107,13 @@ public final class SchemaUtil {
             allFieldNames.values().toArray(new String[0]),
             ActionListener.wrap(
                 sourceMappings -> listener.onResponse(
-                    resolveMappings(aggregationSourceFieldNames, aggregationTypes, fieldNamesForGrouping, sourceMappings)
+                    resolveMappings(
+                        aggregationSourceFieldNames,
+                        aggregationTypes,
+                        fieldNamesForGrouping,
+                        fieldTypesForGrouping,
+                        sourceMappings
+                    )
                 ),
                 listener::onFailure
             )
@@ -131,6 +149,7 @@ public final class SchemaUtil {
         Map<String, String> aggregationSourceFieldNames,
         Map<String, String> aggregationTypes,
         Map<String, String> fieldNamesForGrouping,
+        Map<String, String> fieldTypesForGrouping,
         Map<String, String> sourceMappings
     ) {
         Map<String, String> targetMapping = new HashMap<>();
@@ -140,25 +159,34 @@ public final class SchemaUtil {
             String sourceMapping = sourceFieldName == null ? null : sourceMappings.get(sourceFieldName);
             String destinationMapping = Aggregations.resolveTargetMapping(aggregationName, sourceMapping);
 
-            logger.debug("Deduced mapping for: [{}], agg type [{}] to [{}]", targetFieldName, aggregationName, destinationMapping);
+            logger.debug(() -> new ParameterizedMessage(
+                "Deduced mapping for: [{}], agg type [{}] to [{}]",
+                targetFieldName,
+                aggregationName,
+                destinationMapping
+            ));
 
             if (Aggregations.isDynamicMapping(destinationMapping)) {
-                logger.debug("Dynamic target mapping set for field [{}] and aggregation [{}]", targetFieldName, aggregationName);
+                logger.debug(() -> new ParameterizedMessage(
+                    "Dynamic target mapping set for field [{}] and aggregation [{}]",
+                    targetFieldName,
+                    aggregationName
+                ));
             } else if (destinationMapping != null) {
                 targetMapping.put(targetFieldName, destinationMapping);
             } else {
-                logger.warn("Failed to deduce mapping for [" + targetFieldName + "], fall back to dynamic mapping.");
+                logger.warn("Failed to deduce mapping for [{}], fall back to dynamic mapping.", targetFieldName);
             }
         });
 
         fieldNamesForGrouping.forEach((targetFieldName, sourceFieldName) -> {
-            String destinationMapping = sourceMappings.get(sourceFieldName);
-            logger.debug("Deduced mapping for: [{}] to [{}]", targetFieldName, destinationMapping);
+            String destinationMapping = fieldTypesForGrouping.computeIfAbsent(targetFieldName, (s) -> sourceMappings.get(sourceFieldName));
+            logger.debug(() -> new ParameterizedMessage("Deduced mapping for: [{}] to [{}]", targetFieldName, destinationMapping));
             if (destinationMapping != null) {
                 targetMapping.put(targetFieldName, destinationMapping);
             } else {
-                logger.warn("Failed to deduce mapping for [" + targetFieldName + "], fall back to keyword.");
-                targetMapping.put(targetFieldName, "keyword");
+                logger.warn("Failed to deduce mapping for [{}], fall back to keyword.", targetFieldName);
+                targetMapping.put(targetFieldName, KeywordFieldMapper.CONTENT_TYPE);
             }
         });
 
@@ -196,7 +224,7 @@ public final class SchemaUtil {
                     // TODO: overwrites types, requires resolve if
                     // types are mixed
                     capabilitiesMap.forEach((name, capability) -> {
-                        logger.trace("Extracted type for [{}] : [{}]", fieldName, capability.getType());
+                        logger.trace(() -> new ParameterizedMessage("Extracted type for [{}] : [{}]", fieldName, capability.getType()));
                         extractedTypes.put(fieldName, capability.getType());
                     });
                 }