Browse Source

Add geo_line aggregation (#41612)

A metric aggregation that aggregates a set of points as 
a GeoJSON LineString ordered by some sort parameter.

#### specifics

A `geo_line` aggregation request would specify a `geo_point` field, as well
as a `sort` field. `geo_point` represents the values used in the LineString, 
while the `sort` values will be used as the total ordering of the points.

the `sort` field would support any numeric field, including date.

#### sample usage

```
{
	"query": {
		"bool": {
			"must": [
				{ "term": { "person": "004" } },
				{ "term": { "trajectory": "20090131002206.plt" } }
			]
		}
	},
	"aggs": {
		"make_line": {
			"geo_line": {
				"point": {"field": "location"},
				"sort": { "field": "timestamp" },
                                "include_sort": true,
                                "sort_order": "desc",
                                "size": 15
			}
		}
	}
}
```

#### sample response

```
{
    "took": 21,
    "timed_out": false,
    "_shards": {...},
    "hits": {...},
    "aggregations": {
        "make_line": {
            "type": "LineString",
            "coordinates": [
                [
                    121.52926194481552,
                    38.92878997139633
                ],
                [
                    121.52922699227929,
                    38.92876998055726
                ],
             ]
        }
    }
}
```

#### visual response

<img width="540" alt="Screen Shot 2019-04-26 at 9 40 07 AM" src="https://user-images.githubusercontent.com/388837/56834977-cf278e00-6827-11e9-9c93-005ed48433cc.png">

#### limitations

Due to the cardinality of points, an initial max of 10k points 
will be used. This should support many use-cases.

One solution to overcome this limitation is to keep a PriorityQueue of
points, and simplifying the line once it hits this max. If simplifying
makes sense, it may be a nice option, in general. The ability to use a parameter
to specify how aggressive one wants to simplify. This parameter could be 
the number of points. Example algorithm one could use with a PriorityQueue:
https://bost.ocks.org/mike/simplify/. This would still require O(m) space, where m
is the number of points returned. And would also require heapifying triangles
sorted by their areas, which would be O(log(m)) operations. Since sorting is done, 
anyways, simplifying would still be a O(n log(m)) operation, where n is the total number 
of points to filter........... something to explore


closes #41649
Tal Levy 4 years ago
parent
commit
b514d9bf2e
22 changed files with 1922 additions and 73 deletions
  1. 2 0
      docs/reference/aggregations/metrics.asciidoc
  2. 143 0
      docs/reference/aggregations/metrics/geoline-aggregation.asciidoc
  3. 12 12
      server/src/main/java/org/elasticsearch/search/sort/BucketedSort.java
  4. 1 58
      x-pack/plugin/analytics/src/main/java/org/elasticsearch/xpack/analytics/topmetrics/TopMetricsAggregator.java
  5. 2 0
      x-pack/plugin/core/src/main/java/org/elasticsearch/license/XPackLicenseState.java
  6. 70 0
      x-pack/plugin/core/src/main/java/org/elasticsearch/xpack/core/common/search/aggregations/MissingHelper.java
  7. 10 3
      x-pack/plugin/core/src/main/java/org/elasticsearch/xpack/core/spatial/action/SpatialStatsAction.java
  8. 25 0
      x-pack/plugin/spatial/src/main/java/org/elasticsearch/xpack/spatial/SpatialPlugin.java
  9. 158 0
      x-pack/plugin/spatial/src/main/java/org/elasticsearch/xpack/spatial/search/aggregations/GeoLineAggregationBuilder.java
  10. 99 0
      x-pack/plugin/spatial/src/main/java/org/elasticsearch/xpack/spatial/search/aggregations/GeoLineAggregator.java
  11. 63 0
      x-pack/plugin/spatial/src/main/java/org/elasticsearch/xpack/spatial/search/aggregations/GeoLineAggregatorFactory.java
  12. 195 0
      x-pack/plugin/spatial/src/main/java/org/elasticsearch/xpack/spatial/search/aggregations/GeoLineBucketedSort.java
  13. 214 0
      x-pack/plugin/spatial/src/main/java/org/elasticsearch/xpack/spatial/search/aggregations/InternalGeoLine.java
  14. 174 0
      x-pack/plugin/spatial/src/main/java/org/elasticsearch/xpack/spatial/search/aggregations/MergedGeoLines.java
  15. 55 0
      x-pack/plugin/spatial/src/main/java/org/elasticsearch/xpack/spatial/search/aggregations/PathArraySorter.java
  16. 60 0
      x-pack/plugin/spatial/src/main/java/org/elasticsearch/xpack/spatial/search/aggregations/support/GeoLineMultiValuesSource.java
  17. 76 0
      x-pack/plugin/spatial/src/test/java/org/elasticsearch/xpack/spatial/search/aggregations/GeoLineAggregationBuilderTests.java
  18. 229 0
      x-pack/plugin/spatial/src/test/java/org/elasticsearch/xpack/spatial/search/aggregations/GeoLineAggregatorTests.java
  19. 172 0
      x-pack/plugin/spatial/src/test/java/org/elasticsearch/xpack/spatial/search/aggregations/InternalGeoLineTests.java
  20. 52 0
      x-pack/plugin/spatial/src/test/java/org/elasticsearch/xpack/spatial/search/aggregations/MergedGeoLinesTests.java
  21. 59 0
      x-pack/plugin/spatial/src/test/resources/rest-api-spec/test/50_geoline.yml
  22. 51 0
      x-pack/plugin/spatial/src/yamlRestTest/resources/rest-api-spec/test/60_geo_line.yml

+ 2 - 0
docs/reference/aggregations/metrics.asciidoc

@@ -23,6 +23,8 @@ include::metrics/geobounds-aggregation.asciidoc[]
 
 include::metrics/geocentroid-aggregation.asciidoc[]
 
+include::metrics/geoline-aggregation.asciidoc[]
+
 include::metrics/matrix-stats-aggregation.asciidoc[]
 
 include::metrics/max-aggregation.asciidoc[]

+ 143 - 0
docs/reference/aggregations/metrics/geoline-aggregation.asciidoc

@@ -0,0 +1,143 @@
+[role="xpack"]
+[testenv="gold"]
+[[search-aggregations-metrics-geo-line]]
+=== Geo-Line Aggregation
+++++
+<titleabbrev>Geo-Line</titleabbrev>
+++++
+
+The `geo_line` aggregation aggregates all `geo_point` values within a bucket into a LineString ordered
+by the chosen `sort` field. This `sort` can be a date field, for example. The bucket returned is a valid
+https://tools.ietf.org/html/rfc7946#section-3.2[GeoJSON Feature] representing the line geometry.
+
+[source,console,id=search-aggregations-metrics-geo-line-simple]
+----
+PUT test
+{
+    "mappings": {
+        "dynamic": "strict",
+        "_source": {
+            "enabled": false
+        },
+        "properties": {
+            "my_location": {
+                "type": "geo_point"
+            },
+            "group": {
+                "type": "keyword"
+            },
+            "@timestamp": {
+                "type": "date"
+            }
+        }
+    }
+}
+
+POST /test/_bulk?refresh
+{"index": {}}
+{"my_location": {"lat":37.3450570, "lon": -122.0499820}, "@timestamp": "2013-09-06T16:00:36"}
+{"index": {}}
+{"my_location": {"lat": 37.3451320, "lon": -122.0499820}, "@timestamp": "2013-09-06T16:00:37Z"}
+{"index": {}}
+{"my_location": {"lat": 37.349283, "lon": -122.0505010}, "@timestamp": "2013-09-06T16:00:37Z"}
+
+POST /test/_search?filter_path=aggregations
+{
+  "aggs": {
+    "line": {
+      "geo_line": {
+        "point": {"field": "my_location"},
+        "sort": {"field": "@timestamp"}
+      }
+    }
+  }
+}
+----
+
+Which returns:
+
+[source,js]
+----
+{
+  "aggregations": {
+    "line": {
+      "type" : "Feature",
+      "geometry" : {
+        "type" : "LineString",
+        "coordinates" : [
+          [
+            -122.049982,
+            37.345057
+          ],
+          [
+            -122.050501,
+            37.349283
+          ],
+          [
+            -122.049982,
+            37.345132
+          ]
+        ]
+      },
+      "properties" : {
+        "complete" : true
+      }
+    }
+  }
+}
+----
+// TESTRESPONSE
+
+[[search-aggregations-metrics-geo-line-options]]
+==== Options
+
+`point`::
+(Required)
+
+This option specifies the name of the `geo_point` field
+
+Example usage configuring `my_location` as the point field:
+
+[source,js]
+----
+"point": {
+  "field": "my_location"
+}
+----
+// NOTCONSOLE
+
+`sort`::
+(Required)
+
+This option specifies the name of the numeric field to use as the sort key
+for ordering the points
+
+Example usage configuring `@timestamp` as the sort key:
+
+[source,js]
+----
+"point": {
+  "field": "@timestamp"
+}
+----
+// NOTCONSOLE
+
+`include_sort`::
+(Optional, boolean, default: `false`)
+
+This option includes, when true, an additional array of the sort values in the
+feature properties.
+
+`sort_order`::
+(Optional, string, default: `"ASC"`)
+
+This option accepts one of two values: "ASC", "DESC".
+
+The line is sorted in ascending order by the sort key when set to "ASC", and in descending
+with "DESC".
+
+`size`::
+(Optional, integer, default: `10000`)
+
+The maximum length of the line represented in the aggregation. Valid sizes are
+between one and 10000.

+ 12 - 12
server/src/main/java/org/elasticsearch/search/sort/BucketedSort.java

@@ -66,7 +66,7 @@ import static java.util.Collections.emptyList;
  * worst case. Critically, it is a very fast {@code O(1)} to check if a value
  * is competitive at all which, so long as buckets aren't hit in reverse
  * order, they mostly won't be. Extracting results in sorted order is still
- * {@code O(n * log n)}. 
+ * {@code O(n * log n)}.
  * </p>
  * <p>
  * When we first collect a bucket we make sure that we've allocated enough
@@ -90,7 +90,7 @@ public abstract class BucketedSort implements Releasable {
          * <p>
          * Both parameters will have previously been loaded by
          * {@link Loader#loadFromDoc(long, int)} so the implementer shouldn't
-         * need to grow the underlying storage to implement this. 
+         * need to grow the underlying storage to implement this.
          * </p>
          */
         void swap(long lhs, long rhs);
@@ -128,7 +128,7 @@ public abstract class BucketedSort implements Releasable {
     private final SortOrder order;
     private final DocValueFormat format;
     private final int bucketSize;
-    private final ExtraData extra;
+    protected final ExtraData extra;
     /**
      * {@code true} if the bucket is in heap mode, {@code false} if
      * it is still gathering.
@@ -206,9 +206,9 @@ public abstract class BucketedSort implements Releasable {
     }
 
     /**
-     * Is this bucket a min heap {@code true} or in gathering mode {@code false}? 
+     * Is this bucket a min heap {@code true} or in gathering mode {@code false}?
      */
-    private boolean inHeapMode(long bucket) {
+    public boolean inHeapMode(long bucket) {
         return heapMode.get(bucket);
     }
 
@@ -254,7 +254,7 @@ public abstract class BucketedSort implements Releasable {
     /**
      * {@code true} if the entry at index {@code lhs} is "better" than
      * the entry at {@code rhs}. "Better" in this means "lower" for
-     * {@link SortOrder#ASC} and "higher" for {@link SortOrder#DESC}. 
+     * {@link SortOrder#ASC} and "higher" for {@link SortOrder#DESC}.
      */
     protected abstract boolean betterThan(long lhs, long rhs);
 
@@ -283,7 +283,7 @@ public abstract class BucketedSort implements Releasable {
 
     /**
      * Initialize the gather offsets after setting up values. Subclasses
-     * should call this once, after setting up their {@link #values()}.  
+     * should call this once, after setting up their {@link #values()}.
      */
     protected final void initGatherOffsets() {
         setNextGatherOffsets(0);
@@ -325,12 +325,12 @@ public abstract class BucketedSort implements Releasable {
      * case.
      * </p>
      * <ul>
-     * <li>Hayward, Ryan; McDiarmid, Colin (1991).  
+     * <li>Hayward, Ryan; McDiarmid, Colin (1991).
      * <a href="https://web.archive.org/web/20160205023201/http://www.stats.ox.ac.uk/__data/assets/pdf_file/0015/4173/heapbuildjalg.pdf">
      * Average Case Analysis of Heap Building byRepeated Insertion</a> J. Algorithms.
      * <li>D.E. Knuth, ”The Art of Computer Programming, Vol. 3, Sorting and Searching”</li>
      * </ul>
-     * @param rootIndex the index the start of the bucket 
+     * @param rootIndex the index the start of the bucket
      */
     private void heapify(long rootIndex) {
         int maxParent = bucketSize / 2 - 1;
@@ -344,7 +344,7 @@ public abstract class BucketedSort implements Releasable {
      * runs in {@code O(log n)} time.
      * @param rootIndex index of the start of the bucket
      * @param parent Index within the bucket of the parent to check.
-     *               For example, 0 is the "root". 
+     *               For example, 0 is the "root".
      */
     private void downHeap(long rootIndex, int parent) {
         while (true) {
@@ -443,7 +443,7 @@ public abstract class BucketedSort implements Releasable {
         /**
          * {@code true} if the sort value for the doc is "better" than the
          * entry at {@code index}. "Better" in means is "lower" for
-         * {@link SortOrder#ASC} and "higher" for {@link SortOrder#DESC}. 
+         * {@link SortOrder#ASC} and "higher" for {@link SortOrder#DESC}.
          */
         protected abstract boolean docBetterThan(long index);
 
@@ -545,7 +545,7 @@ public abstract class BucketedSort implements Releasable {
          * The maximum size of buckets this can store. This is because we
          * store the next offset to write to in a float and floats only have
          * {@code 23} bits of mantissa so they can't accurate store values
-         * higher than {@code 2 ^ 24}. 
+         * higher than {@code 2 ^ 24}.
          */
         public static final int MAX_BUCKET_SIZE = (int) Math.pow(2, 24);
 

+ 1 - 58
x-pack/plugin/analytics/src/main/java/org/elasticsearch/xpack/analytics/topmetrics/TopMetricsAggregator.java

@@ -32,6 +32,7 @@ import org.elasticsearch.search.internal.SearchContext;
 import org.elasticsearch.search.sort.BucketedSort;
 import org.elasticsearch.search.sort.SortBuilder;
 import org.elasticsearch.search.sort.SortValue;
+import org.elasticsearch.xpack.core.common.search.aggregations.MissingHelper;
 import org.elasticsearch.xpack.analytics.topmetrics.InternalTopMetrics.MetricValue;
 
 import java.io.IOException;
@@ -495,62 +496,4 @@ class TopMetricsAggregator extends NumericMetricsAggregator.MultiValue {
         public void close() {}
     }
 
-    /**
-     * Helps {@link LongMetricValues} track "empty" slots. It attempts to have
-     * very low CPU overhead and no memory overhead when there *aren't* empty
-     * values.
-     */
-    private static class MissingHelper implements Releasable {
-        private final BigArrays bigArrays;
-        private BitArray tracker;
-
-        MissingHelper(BigArrays bigArrays) {
-            this.bigArrays = bigArrays;
-        }
-
-        void markMissing(long index) {
-            if (tracker == null) {
-                tracker = new BitArray(index, bigArrays);
-            }
-            tracker.set(index);
-        }
-
-        void markNotMissing(long index) {
-            if (tracker == null) {
-                return;
-            }
-            tracker.clear(index);
-        }
-
-        void swap(long lhs, long rhs) {
-            if (tracker == null) {
-                return;
-            }
-            boolean backup = tracker.get(lhs);
-            if (tracker.get(rhs)) {
-                tracker.set(lhs);
-            } else {
-                tracker.clear(lhs);
-            }
-            if (backup) {
-                tracker.set(rhs);
-            } else {
-                tracker.clear(rhs);
-            }
-        }
-
-        boolean isEmpty(long index) {
-            if (tracker == null) {
-                return false;
-            }
-            return tracker.get(index);
-        }
-
-        @Override
-        public void close() {
-            if (tracker != null) {
-                tracker.close();
-            }
-        }
-    }
 }

+ 2 - 0
x-pack/plugin/core/src/main/java/org/elasticsearch/license/XPackLicenseState.java

@@ -96,6 +96,8 @@ public class XPackLicenseState {
 
         SPATIAL_GEO_GRID(OperationMode.GOLD, true),
 
+        SPATIAL_GEO_LINE(OperationMode.GOLD, true),
+
         ANALYTICS(OperationMode.MISSING, true),
 
         SEARCHABLE_SNAPSHOTS(OperationMode.ENTERPRISE, true);

+ 70 - 0
x-pack/plugin/core/src/main/java/org/elasticsearch/xpack/core/common/search/aggregations/MissingHelper.java

@@ -0,0 +1,70 @@
+/*
+ * Copyright Elasticsearch B.V. and/or licensed to Elasticsearch B.V. under one
+ * or more contributor license agreements. Licensed under the Elastic License;
+ * you may not use this file except in compliance with the Elastic License.
+ */
+
+package org.elasticsearch.xpack.core.common.search.aggregations;
+
+import org.elasticsearch.common.lease.Releasable;
+import org.elasticsearch.common.util.BigArrays;
+import org.elasticsearch.common.util.BitArray;
+
+/**
+ * Helps long-valued {@link org.elasticsearch.search.sort.BucketedSort.ExtraData} track "empty" slots. It attempts to have
+ * very low CPU overhead and no memory overhead when there *aren't* empty
+ * values.
+ */
+public class MissingHelper implements Releasable {
+    private final BigArrays bigArrays;
+    private BitArray tracker;
+
+    public MissingHelper(BigArrays bigArrays) {
+        this.bigArrays = bigArrays;
+    }
+
+    public void markMissing(long index) {
+        if (tracker == null) {
+            tracker = new BitArray(index, bigArrays);
+        }
+        tracker.set(index);
+    }
+
+    public void markNotMissing(long index) {
+        if (tracker == null) {
+            return;
+        }
+        tracker.clear(index);
+    }
+
+    public void swap(long lhs, long rhs) {
+        if (tracker == null) {
+            return;
+        }
+        boolean backup = tracker.get(lhs);
+        if (tracker.get(rhs)) {
+            tracker.set(lhs);
+        } else {
+            tracker.clear(lhs);
+        }
+        if (backup) {
+            tracker.set(rhs);
+        } else {
+            tracker.clear(rhs);
+        }
+    }
+
+    public boolean isEmpty(long index) {
+        if (tracker == null) {
+            return false;
+        }
+        return tracker.get(index);
+    }
+
+    @Override
+    public void close() {
+        if (tracker != null) {
+            tracker.close();
+        }
+    }
+}

+ 10 - 3
x-pack/plugin/core/src/main/java/org/elasticsearch/xpack/core/spatial/action/SpatialStatsAction.java

@@ -38,6 +38,7 @@ public class SpatialStatsAction extends ActionType<SpatialStatsAction.Response>
      * Items to track. Serialized by ordinals. Append only, don't remove or change order of items in this list.
      */
     public enum Item {
+        GEOLINE
     }
 
     public static class Request extends BaseNodesRequest<Request> implements ToXContentObject {
@@ -115,9 +116,15 @@ public class SpatialStatsAction extends ActionType<SpatialStatsAction.Response>
         @Override
         public XContentBuilder toXContent(XContentBuilder builder, Params params) throws IOException {
             EnumCounters<Item> stats = getStats();
-            builder.startObject("stats");
-            for (Item item : Item.values()) {
-                builder.field(item.name().toLowerCase(Locale.ROOT) + "_usage", stats.get(item));
+            builder.startObject();
+            {
+                builder.startObject("stats");
+                {
+                    for (Item item : Item.values()) {
+                        builder.field(item.name().toLowerCase(Locale.ROOT) + "_usage", stats.get(item));
+                    }
+                }
+                builder.endObject();
             }
             builder.endObject();
             return builder;

+ 25 - 0
x-pack/plugin/spatial/src/main/java/org/elasticsearch/xpack/spatial/SpatialPlugin.java

@@ -7,6 +7,7 @@ package org.elasticsearch.xpack.spatial;
 
 import org.elasticsearch.action.ActionRequest;
 import org.elasticsearch.action.ActionResponse;
+import org.elasticsearch.common.xcontent.ContextParser;
 import org.elasticsearch.geo.GeoPlugin;
 import org.elasticsearch.index.mapper.Mapper;
 import org.elasticsearch.ingest.Processor;
@@ -38,6 +39,8 @@ import org.elasticsearch.xpack.spatial.index.mapper.PointFieldMapper;
 import org.elasticsearch.xpack.spatial.index.mapper.ShapeFieldMapper;
 import org.elasticsearch.xpack.spatial.index.query.ShapeQueryBuilder;
 import org.elasticsearch.xpack.spatial.ingest.CircleProcessor;
+import org.elasticsearch.xpack.spatial.search.aggregations.GeoLineAggregationBuilder;
+import org.elasticsearch.xpack.spatial.search.aggregations.InternalGeoLine;
 import org.elasticsearch.xpack.spatial.search.aggregations.bucket.geogrid.BoundedGeoHashGridTiler;
 import org.elasticsearch.xpack.spatial.search.aggregations.bucket.geogrid.BoundedGeoTileGridTiler;
 import org.elasticsearch.xpack.spatial.search.aggregations.bucket.geogrid.GeoGridTiler;
@@ -60,6 +63,7 @@ import java.util.function.Consumer;
 import static java.util.Collections.singletonList;
 
 public class SpatialPlugin extends GeoPlugin implements ActionPlugin, MapperPlugin, SearchPlugin, IngestPlugin {
+   private final SpatialUsage usage = new SpatialUsage();
 
     // to be overriden by tests
     protected XPackLicenseState getLicenseState() {
@@ -99,6 +103,18 @@ public class SpatialPlugin extends GeoPlugin implements ActionPlugin, MapperPlug
         );
     }
 
+    @Override
+    public List<AggregationSpec> getAggregations() {
+        return List.of(
+            new AggregationSpec(
+                    GeoLineAggregationBuilder.NAME,
+                    GeoLineAggregationBuilder::new,
+                    usage.track(SpatialStatsAction.Item.GEOLINE,
+                        checkLicense(GeoLineAggregationBuilder.PARSER, XPackLicenseState.Feature.SPATIAL_GEO_LINE)))
+                .addResultReader(InternalGeoLine::new)
+                .setAggregatorRegistrar(GeoLineAggregationBuilder::registerUsage));
+    }
+
     @Override
     public Map<String, Processor.Factory> getProcessors(Processor.Parameters parameters) {
         return Map.of(CircleProcessor.TYPE, new CircleProcessor.Factory());
@@ -179,4 +195,13 @@ public class SpatialPlugin extends GeoPlugin implements ActionPlugin, MapperPlug
     private static void registerCardinalityAggregator(ValuesSourceRegistry.Builder builder) {
         builder.register(CardinalityAggregationBuilder.REGISTRY_KEY, GeoShapeValuesSourceType.instance(), CardinalityAggregator::new, true);
     }
+
+    private <T> ContextParser<String, T> checkLicense(ContextParser<String, T> realParser, XPackLicenseState.Feature feature) {
+        return (parser, name) -> {
+            if (getLicenseState().checkFeature(feature) == false) {
+                throw LicenseUtils.newComplianceException(feature.name());
+            }
+            return realParser.parse(parser, name);
+        };
+    }
 }

+ 158 - 0
x-pack/plugin/spatial/src/main/java/org/elasticsearch/xpack/spatial/search/aggregations/GeoLineAggregationBuilder.java

@@ -0,0 +1,158 @@
+/*
+ * Copyright Elasticsearch B.V. and/or licensed to Elasticsearch B.V. under one
+ * or more contributor license agreements. Licensed under the Elastic License;
+ * you may not use this file except in compliance with the Elastic License.
+ */
+package org.elasticsearch.xpack.spatial.search.aggregations;
+
+import org.elasticsearch.common.ParseField;
+import org.elasticsearch.common.io.stream.StreamInput;
+import org.elasticsearch.common.io.stream.StreamOutput;
+import org.elasticsearch.common.xcontent.ObjectParser;
+import org.elasticsearch.common.xcontent.ToXContent;
+import org.elasticsearch.common.xcontent.XContentBuilder;
+import org.elasticsearch.index.query.QueryBuilder;
+import org.elasticsearch.search.DocValueFormat;
+import org.elasticsearch.search.aggregations.AggregationBuilder;
+import org.elasticsearch.search.aggregations.AggregatorFactories;
+import org.elasticsearch.search.aggregations.AggregatorFactory;
+import org.elasticsearch.search.aggregations.support.AggregationContext;
+import org.elasticsearch.search.aggregations.support.CoreValuesSourceType;
+import org.elasticsearch.search.aggregations.support.MultiValuesSourceAggregationBuilder;
+import org.elasticsearch.search.aggregations.support.MultiValuesSourceAggregatorFactory;
+import org.elasticsearch.search.aggregations.support.MultiValuesSourceFieldConfig;
+import org.elasticsearch.search.aggregations.support.MultiValuesSourceParseHelper;
+import org.elasticsearch.search.aggregations.support.ValueType;
+import org.elasticsearch.search.aggregations.support.ValuesSourceConfig;
+import org.elasticsearch.search.aggregations.support.ValuesSourceRegistry;
+import org.elasticsearch.search.aggregations.support.ValuesSourceType;
+import org.elasticsearch.search.sort.SortOrder;
+
+import java.io.IOException;
+import java.util.Map;
+import java.util.Objects;
+
+public class GeoLineAggregationBuilder
+    extends MultiValuesSourceAggregationBuilder.LeafOnly<GeoLineAggregationBuilder> {
+
+    static final ParseField POINT_FIELD = new ParseField("point");
+    static final ParseField SORT_FIELD = new ParseField("sort");
+    static final ParseField ORDER_FIELD = new ParseField("sort_order");
+    static final ParseField INCLUDE_SORT_FIELD = new ParseField("include_sort");
+    static final ParseField SIZE_FIELD = new ParseField("size");
+
+    public static final String NAME = "geo_line";
+
+    public static final ObjectParser<GeoLineAggregationBuilder, String> PARSER =
+        ObjectParser.fromBuilder(NAME, GeoLineAggregationBuilder::new);
+    static {
+        MultiValuesSourceParseHelper.declareCommon(PARSER, true, ValueType.NUMERIC);
+        MultiValuesSourceParseHelper.declareField(POINT_FIELD.getPreferredName(), PARSER, true, false, false);
+        MultiValuesSourceParseHelper.declareField(SORT_FIELD.getPreferredName(), PARSER, true, false, false);
+        PARSER.declareString((builder, order) -> builder.sortOrder(SortOrder.fromString(order)), ORDER_FIELD);
+        PARSER.declareBoolean(GeoLineAggregationBuilder::includeSort, INCLUDE_SORT_FIELD);
+        PARSER.declareInt(GeoLineAggregationBuilder::size, SIZE_FIELD);
+    }
+
+    private boolean includeSort;
+    private SortOrder sortOrder = SortOrder.ASC;
+    private int size = MAX_PATH_SIZE;
+    static final int MAX_PATH_SIZE = 10000;
+
+    public static void registerUsage(ValuesSourceRegistry.Builder builder) {
+        builder.registerUsage(NAME, CoreValuesSourceType.GEOPOINT);
+    }
+
+    public GeoLineAggregationBuilder(String name) {
+        super(name);
+    }
+
+    private GeoLineAggregationBuilder(GeoLineAggregationBuilder clone,
+                                      AggregatorFactories.Builder factoriesBuilder, Map<String, Object> metaData) {
+        super(clone, factoriesBuilder, metaData);
+    }
+
+    /**
+     * Read from a stream.
+     */
+    public GeoLineAggregationBuilder(StreamInput in) throws IOException {
+        super(in);
+        sortOrder = SortOrder.readFromStream(in);
+        includeSort = in.readBoolean();
+        size = in.readVInt();
+    }
+
+    public GeoLineAggregationBuilder includeSort(boolean includeSort) {
+        this.includeSort = includeSort;
+        return this;
+    }
+
+    public GeoLineAggregationBuilder sortOrder(SortOrder sortOrder) {
+        this.sortOrder = sortOrder;
+        return this;
+    }
+
+    public GeoLineAggregationBuilder size(int size) {
+        if (size <= 0 || size > MAX_PATH_SIZE) {
+            throw new IllegalArgumentException("invalid [size] value [" + size + "] must be a positive integer <= "
+                + MAX_PATH_SIZE);
+        }
+        this.size = size;
+        return this;
+    }
+
+    @Override
+    protected AggregationBuilder shallowCopy(AggregatorFactories.Builder factoriesBuilder, Map<String, Object> metaData) {
+        return new GeoLineAggregationBuilder(this, factoriesBuilder, metaData);
+    }
+
+    @Override
+    public BucketCardinality bucketCardinality() {
+        return BucketCardinality.NONE;
+    }
+
+    @Override
+    protected void innerWriteTo(StreamOutput out) throws IOException {
+        sortOrder.writeTo(out);
+        out.writeBoolean(includeSort);
+        out.writeVInt(size);
+    }
+
+    @Override
+    protected ValuesSourceType defaultValueSourceType() {
+        return CoreValuesSourceType.NUMERIC;
+    }
+
+    @Override
+    protected MultiValuesSourceAggregatorFactory innerBuild(AggregationContext aggregationContext,
+                                                            Map<String, ValuesSourceConfig> configs,
+                                                            Map<String, QueryBuilder> filters,
+                                                            DocValueFormat format,
+                                                            AggregatorFactory parent,
+                                                            AggregatorFactories.Builder subFactoriesBuilder) throws IOException {
+        return new GeoLineAggregatorFactory(name, configs, format, aggregationContext, parent, subFactoriesBuilder, metadata,
+            includeSort, sortOrder, size);
+    }
+
+    public GeoLineAggregationBuilder point(MultiValuesSourceFieldConfig pointConfig) {
+        pointConfig = Objects.requireNonNull(pointConfig, "Configuration for field [" + POINT_FIELD + "] cannot be null");
+        field(POINT_FIELD.getPreferredName(), pointConfig);
+        return this;
+    }
+
+    public GeoLineAggregationBuilder sort(MultiValuesSourceFieldConfig sortConfig) {
+        sortConfig = Objects.requireNonNull(sortConfig, "Configuration for field [" + SORT_FIELD + "] cannot be null");
+        field(SORT_FIELD.getPreferredName(), sortConfig);
+        return this;
+    }
+
+    @Override
+    public XContentBuilder doXContentBody(XContentBuilder builder, ToXContent.Params params) {
+        return builder;
+    }
+
+    @Override
+    public String getType() {
+        return NAME;
+    }
+}

+ 99 - 0
x-pack/plugin/spatial/src/main/java/org/elasticsearch/xpack/spatial/search/aggregations/GeoLineAggregator.java

@@ -0,0 +1,99 @@
+/*
+ * Copyright Elasticsearch B.V. and/or licensed to Elasticsearch B.V. under one
+ * or more contributor license agreements. Licensed under the Elastic License;
+ * you may not use this file except in compliance with the Elastic License.
+ */
+package org.elasticsearch.xpack.spatial.search.aggregations;
+
+import org.apache.lucene.index.LeafReaderContext;
+import org.apache.lucene.search.ScoreMode;
+import org.elasticsearch.common.lease.Releasables;
+import org.elasticsearch.search.aggregations.Aggregator;
+import org.elasticsearch.search.aggregations.InternalAggregation;
+import org.elasticsearch.search.aggregations.LeafBucketCollector;
+import org.elasticsearch.search.aggregations.metrics.MetricsAggregator;
+import org.elasticsearch.xpack.spatial.search.aggregations.support.GeoLineMultiValuesSource;
+import org.elasticsearch.search.internal.SearchContext;
+import org.elasticsearch.search.sort.BucketedSort;
+import org.elasticsearch.search.sort.SortOrder;
+
+import java.io.IOException;
+import java.util.Map;
+
+/**
+ * Metric Aggregation for joining sorted geo_point values into a single path
+ **/
+final class GeoLineAggregator extends MetricsAggregator {
+    /** Multiple ValuesSource with field names */
+    private final GeoLineMultiValuesSource valuesSources;
+
+    private final GeoLineBucketedSort sort;
+    private final GeoLineBucketedSort.Extra extra;
+    private final boolean includeSorts;
+    private final SortOrder sortOrder;
+    private final int size;
+
+    GeoLineAggregator(String name, GeoLineMultiValuesSource valuesSources, SearchContext context,
+                      Aggregator parent, Map<String,Object> metaData, boolean includeSorts, SortOrder sortOrder,
+                      int size) throws IOException {
+        super(name, context, parent, metaData);
+        this.valuesSources = valuesSources;
+        if (valuesSources != null) {
+            this.extra = new GeoLineBucketedSort.Extra(context.bigArrays(), valuesSources);
+            this.sort = new GeoLineBucketedSort(context.bigArrays(), sortOrder, null, size, valuesSources, extra);
+        } else {
+            this.extra = null;
+            this.sort = null;
+        }
+        this.includeSorts = includeSorts;
+        this.sortOrder = sortOrder;
+        this.size = size;
+    }
+
+    @Override
+    public ScoreMode scoreMode() {
+        if (valuesSources != null && valuesSources.needsScores()) {
+            return ScoreMode.COMPLETE;
+        }
+        return super.scoreMode();
+    }
+
+    @Override
+    public LeafBucketCollector getLeafCollector(LeafReaderContext ctx,
+                                                final LeafBucketCollector sub) throws IOException {
+        if (valuesSources == null) {
+            return LeafBucketCollector.NO_OP_COLLECTOR;
+        }
+        BucketedSort.Leaf leafSort = sort.forLeaf(ctx);
+
+        return new LeafBucketCollector(){
+            @Override
+            public void collect(int doc, long bucket) throws IOException {
+                leafSort.collect(doc, bucket);
+            }
+        };
+    }
+
+    @Override
+    public InternalAggregation buildAggregation(long bucket) {
+        if (valuesSources == null) {
+            return buildEmptyAggregation();
+        }
+        boolean complete = sort.inHeapMode(bucket) == false;
+        addRequestCircuitBreakerBytes((Double.SIZE + Long.SIZE) * sort.sizeOf(bucket));
+        double[] sortVals = sort.getSortValues(bucket);
+        long[] bucketLine = sort.getPoints(bucket);
+        new PathArraySorter(bucketLine, sortVals, sortOrder).sort();
+        return new InternalGeoLine(name, bucketLine, sortVals, metadata(), complete, includeSorts, sortOrder, size);
+    }
+
+    @Override
+    public InternalAggregation buildEmptyAggregation() {
+        return new InternalGeoLine(name, null, null, metadata(), true, includeSorts, sortOrder, size);
+    }
+
+    @Override
+    public void doClose() {
+        Releasables.close(sort, extra);
+    }
+}

+ 63 - 0
x-pack/plugin/spatial/src/main/java/org/elasticsearch/xpack/spatial/search/aggregations/GeoLineAggregatorFactory.java

@@ -0,0 +1,63 @@
+/*
+ * Copyright Elasticsearch B.V. and/or licensed to Elasticsearch B.V. under one
+ * or more contributor license agreements. Licensed under the Elastic License;
+ * you may not use this file except in compliance with the Elastic License.
+ */
+package org.elasticsearch.xpack.spatial.search.aggregations;
+
+import org.elasticsearch.search.DocValueFormat;
+import org.elasticsearch.search.aggregations.Aggregator;
+import org.elasticsearch.search.aggregations.AggregatorFactories;
+import org.elasticsearch.search.aggregations.AggregatorFactory;
+import org.elasticsearch.search.aggregations.CardinalityUpperBound;
+import org.elasticsearch.search.aggregations.support.AggregationContext;
+import org.elasticsearch.xpack.spatial.search.aggregations.support.GeoLineMultiValuesSource;
+import org.elasticsearch.search.aggregations.support.MultiValuesSourceAggregatorFactory;
+import org.elasticsearch.search.aggregations.support.ValuesSourceConfig;
+import org.elasticsearch.search.internal.SearchContext;
+import org.elasticsearch.search.sort.SortOrder;
+
+import java.io.IOException;
+import java.util.Map;
+
+final class GeoLineAggregatorFactory extends MultiValuesSourceAggregatorFactory {
+
+    private boolean includeSort;
+    private SortOrder sortOrder;
+    private int size;
+
+    GeoLineAggregatorFactory(String name,
+                             Map<String, ValuesSourceConfig> configs,
+                             DocValueFormat format, AggregationContext aggregationContext, AggregatorFactory parent,
+                             AggregatorFactories.Builder subFactoriesBuilder,
+                             Map<String, Object> metaData, boolean includeSort, SortOrder sortOrder, int size) throws IOException {
+        super(name, configs, format, aggregationContext, parent, subFactoriesBuilder, metaData);
+        this.includeSort = includeSort;
+        this.sortOrder = sortOrder;
+        this.size = size;
+    }
+
+    @Override
+    protected Aggregator createUnmapped(SearchContext searchContext,
+                                        Aggregator parent,
+                                        Map<String, Object> metaData) throws IOException {
+        return new GeoLineAggregator(name, null, searchContext, parent, metaData, includeSort, sortOrder, size);
+    }
+
+    @Override
+    protected Aggregator doCreateInternal(SearchContext searchContext,
+                                          Map<String, ValuesSourceConfig> configs,
+                                          DocValueFormat format,
+                                          Aggregator parent,
+                                          CardinalityUpperBound cardinality,
+                                          Map<String, Object> metaData) throws IOException {
+        GeoLineMultiValuesSource valuesSources =
+            new GeoLineMultiValuesSource(configs, searchContext.getQueryShardContext());
+        return new GeoLineAggregator(name, valuesSources, searchContext, parent, metaData, includeSort, sortOrder, size);
+    }
+
+    @Override
+    public String getStatsSubtype() {
+        return configs.get(GeoLineAggregationBuilder.POINT_FIELD.getPreferredName()).valueSourceType().typeName();
+    }
+}

+ 195 - 0
x-pack/plugin/spatial/src/main/java/org/elasticsearch/xpack/spatial/search/aggregations/GeoLineBucketedSort.java

@@ -0,0 +1,195 @@
+/*
+ * Copyright Elasticsearch B.V. and/or licensed to Elasticsearch B.V. under one
+ * or more contributor license agreements. Licensed under the Elastic License;
+ * you may not use this file except in compliance with the Elastic License.
+ */
+
+package org.elasticsearch.xpack.spatial.search.aggregations;
+
+import org.apache.lucene.geo.GeoEncodingUtils;
+import org.apache.lucene.index.LeafReaderContext;
+import org.elasticsearch.common.geo.GeoPoint;
+import org.elasticsearch.common.lease.Releasable;
+import org.elasticsearch.common.lease.Releasables;
+import org.elasticsearch.common.util.BigArrays;
+import org.elasticsearch.common.util.DoubleArray;
+import org.elasticsearch.common.util.LongArray;
+import org.elasticsearch.index.fielddata.MultiGeoPointValues;
+import org.elasticsearch.index.fielddata.SortedNumericDoubleValues;
+import org.elasticsearch.search.DocValueFormat;
+import org.elasticsearch.search.aggregations.AggregationExecutionException;
+import org.elasticsearch.xpack.spatial.search.aggregations.support.GeoLineMultiValuesSource;
+import org.elasticsearch.search.sort.BucketedSort;
+import org.elasticsearch.search.sort.SortOrder;
+import org.elasticsearch.xpack.core.common.search.aggregations.MissingHelper;
+
+import java.io.IOException;
+
+import static org.elasticsearch.xpack.spatial.search.aggregations.GeoLineAggregationBuilder.SORT_FIELD;
+
+/**
+ * A bigArrays sorter of both a geo_line's sort-values and points.
+ *
+ * This class accumulates geo_points within buckets and heapifies the
+ * bucket based on whether there are too many items in the bucket that
+ * need to be dropped based on their sort value.
+ */
+public class GeoLineBucketedSort extends BucketedSort.ForDoubles {
+    private final GeoLineMultiValuesSource valuesSources;
+
+    public GeoLineBucketedSort(BigArrays bigArrays, SortOrder sortOrder, DocValueFormat format, int bucketSize,
+                               GeoLineMultiValuesSource valuesSources, GeoLineBucketedSort.Extra extra) {
+        super(bigArrays, sortOrder, format, bucketSize, extra);
+        this.valuesSources = valuesSources;
+    }
+
+    public long sizeOf(long bucket) {
+        int bucketSize = getBucketSize();
+        long rootIndex = bucket * bucketSize;
+        if (rootIndex >= values().size()) {
+            // We've never seen this bucket.
+            return 0;
+        }
+        long start = inHeapMode(bucket) ? rootIndex : (rootIndex + getNextGatherOffset(rootIndex) + 1);
+        long end = rootIndex + bucketSize;
+        return end - start;
+    }
+
+    /**
+     * @param bucket the bucket ordinal
+     * @return the array of sort-values for the specific bucket. This array may not necessarily be heapified already, so no ordering is
+     *         guaranteed.
+     */
+    public double[] getSortValues(long bucket) {
+        int bucketSize = getBucketSize();
+        long rootIndex = bucket * bucketSize;
+        if (rootIndex >= values().size()) {
+            // We've never seen this bucket.
+            return new double[]{};
+        }
+        long start = inHeapMode(bucket) ? rootIndex : (rootIndex + getNextGatherOffset(rootIndex) + 1);
+        long end = rootIndex + bucketSize;
+        double[] result = new double[(int)(end - start)];
+        int i = 0;
+        for (long index = start; index < end; index++) {
+            double timestampValue = ((DoubleArray)values()).get(index);
+            result[i++] = timestampValue;
+        }
+        return result;
+    }
+
+    /**
+     * @param bucket the bucket ordinal
+     * @return the array of points, ordered by the their respective sort-value for the specific bucket.
+     */
+    public long[] getPoints(long bucket) {
+        int bucketSize = getBucketSize();
+        long rootIndex = bucket * bucketSize;
+        if (rootIndex >= values().size()) {
+            // We've never seen this bucket.
+            return new long[]{};
+        }
+        long start = inHeapMode(bucket) ? rootIndex : (rootIndex + getNextGatherOffset(rootIndex) + 1);
+        long end = rootIndex + bucketSize;
+        long[] result = new long[(int)(end - start)];
+        int i = 0;
+        for (long index = start; index < end; index++) {
+            long geoPointValue = ((Extra) extra).values.get(index);
+            result[i++] = geoPointValue;
+        }
+        return result;
+    }
+
+    @Override
+    public BucketedSort.Leaf forLeaf(LeafReaderContext ctx) throws IOException {
+        return new BucketedSort.ForDoubles.Leaf(ctx) {
+            private final SortedNumericDoubleValues docSortValues = valuesSources.getNumericField(SORT_FIELD.getPreferredName(), ctx);
+            private double docValue;
+
+            @Override
+            protected boolean advanceExact(int doc) throws IOException {
+                if (docSortValues.advanceExact(doc)) {
+                    if (docSortValues.docValueCount() > 1) {
+                        throw new AggregationExecutionException("Encountered more than one sort value for a " +
+                            "single document. Use a script to combine multiple sort-values-per-doc into a single value.");
+                    }
+
+                    // There should always be one weight if advanceExact lands us here, either
+                    // a real weight or a `missing` weight
+                    assert docSortValues.docValueCount() == 1;
+                    docValue = docSortValues.nextValue();
+                    return true;
+                } else {
+                    docValue = Long.MIN_VALUE;
+                }
+                return false;
+            }
+
+            @Override
+            protected double docValue() {
+                return docValue;
+            }
+        };
+    }
+
+    /**
+     * An {@link BucketedSort.ExtraData} representing the geo-point for a document
+     * within a bucket.
+     */
+    static class Extra implements BucketedSort.ExtraData, Releasable {
+
+        private final BigArrays bigArrays;
+        private final GeoLineMultiValuesSource valuesSources;
+        private LongArray values;
+        private final MissingHelper empty;
+
+        Extra(BigArrays bigArrays, GeoLineMultiValuesSource valuesSources) {
+            this.bigArrays = bigArrays;
+            this.valuesSources = valuesSources;
+            this.values = bigArrays.newLongArray(1, false);
+            this.empty = new MissingHelper(bigArrays);
+        }
+
+        @Override
+        public void swap(long lhs, long rhs) {
+            long tmp = values.get(lhs);
+            values.set(lhs, values.get(rhs));
+            values.set(rhs, tmp);
+            empty.swap(lhs, rhs);
+        }
+
+        @Override
+        public Loader loader(LeafReaderContext ctx) throws IOException {
+            final MultiGeoPointValues docGeoPointValues = valuesSources
+                .getGeoPointField(GeoLineAggregationBuilder.POINT_FIELD.getPreferredName(), ctx);
+            return (index, doc) -> {
+                if (false == docGeoPointValues.advanceExact(doc)) {
+                    empty.markMissing(index);
+                    return;
+                }
+
+                if (docGeoPointValues.docValueCount() > 1) {
+                    throw new AggregationExecutionException("Encountered more than one geo_point value for a " +
+                        "single document. Use a script to combine multiple geo_point-values-per-doc into a single value.");
+                }
+
+                if (index > values.size()) {
+                    values = bigArrays.grow(values, index + 1);
+                }
+
+                final GeoPoint point = docGeoPointValues.nextValue();
+                int encodedLat = GeoEncodingUtils.encodeLatitude(point.lat());
+                int encodedLon = GeoEncodingUtils.encodeLongitude(point.lon());
+                long lonLat = (((long) encodedLon) << 32) | encodedLat & 0xffffffffL;
+
+                values.set(index, lonLat);
+                empty.markNotMissing(index);
+            };
+        }
+
+        @Override
+        public void close() {
+            Releasables.close(values, empty);
+        }
+    }
+}

+ 214 - 0
x-pack/plugin/spatial/src/main/java/org/elasticsearch/xpack/spatial/search/aggregations/InternalGeoLine.java

@@ -0,0 +1,214 @@
+/*
+ * Copyright Elasticsearch B.V. and/or licensed to Elasticsearch B.V. under one
+ * or more contributor license agreements. Licensed under the Elastic License;
+ * you may not use this file except in compliance with the Elastic License.
+ */
+package org.elasticsearch.xpack.spatial.search.aggregations;
+
+import org.apache.lucene.geo.GeoEncodingUtils;
+import org.elasticsearch.common.Strings;
+import org.elasticsearch.common.io.stream.StreamInput;
+import org.elasticsearch.common.io.stream.StreamOutput;
+import org.elasticsearch.common.xcontent.XContentBuilder;
+import org.elasticsearch.search.aggregations.InternalAggregation;
+import org.elasticsearch.search.sort.SortOrder;
+
+import java.io.IOException;
+import java.util.ArrayList;
+import java.util.Arrays;
+import java.util.List;
+import java.util.Map;
+import java.util.Objects;
+
+/**
+ * A single line string representing a sorted sequence of geo-points
+ */
+public class InternalGeoLine extends InternalAggregation {
+    private static final double SCALE = Math.pow(10, 6);
+
+    private long[] line;
+    private double[] sortVals;
+    private boolean complete;
+    private boolean includeSorts;
+    private SortOrder sortOrder;
+    private int size;
+
+    /**
+     * A geo_line representing the bucket for a {@link GeoLineAggregationBuilder}. The values of <code>line</code> and <code>sortVals</code>
+     * are expected to be sorted using <code>sortOrder</code>.
+     *
+     * @param name            the name of the aggregation
+     * @param line            the ordered geo-points representing the line
+     * @param sortVals        the ordered sort-values associated with the points in the line (e.g. timestamp)
+     * @param metadata        the aggregation's metadata
+     * @param complete        true iff the line is representative of all the points that fall within the bucket. False otherwise.
+     * @param includeSorts    true iff the sort-values should be rendered in xContent as properties of the line-string. False otherwise.
+     * @param sortOrder       the {@link SortOrder} for the line. Whether the points are to be plotted in asc or desc order
+     * @param size            the max length of the line-string.
+     */
+    InternalGeoLine(String name, long[] line, double[] sortVals, Map<String, Object> metadata, boolean complete,
+                    boolean includeSorts, SortOrder sortOrder, int size) {
+        super(name, metadata);
+        this.line = line;
+        this.sortVals = sortVals;
+        this.complete = complete;
+        this.includeSorts = includeSorts;
+        this.sortOrder = sortOrder;
+        this.size = size;
+    }
+
+    /**
+     * Read from a stream.
+     */
+    public InternalGeoLine(StreamInput in) throws IOException {
+        super(in);
+        this.line = in.readLongArray();
+        this.sortVals = in.readDoubleArray();
+        this.complete = in.readBoolean();
+        this.includeSorts = in.readBoolean();
+        this.sortOrder = SortOrder.readFromStream(in);
+        this.size = in.readVInt();
+    }
+
+    @Override
+    protected void doWriteTo(StreamOutput out) throws IOException {
+        out.writeLongArray(line);
+        out.writeDoubleArray(sortVals);
+        out.writeBoolean(complete);
+        out.writeBoolean(includeSorts);
+        sortOrder.writeTo(out);
+        out.writeVInt(size);
+    }
+
+    @Override
+    public InternalAggregation reduce(List<InternalAggregation> aggregations, ReduceContext reduceContext) {
+        int mergedSize = 0;
+        boolean complete = true;
+        boolean includeSorts = true;
+        List<InternalGeoLine> internalGeoLines = new ArrayList<>(aggregations.size());
+        for (InternalAggregation aggregation : aggregations) {
+            InternalGeoLine geoLine = (InternalGeoLine) aggregation;
+            internalGeoLines.add(geoLine);
+            mergedSize += geoLine.line.length;
+            complete &= geoLine.complete;
+            includeSorts &= geoLine.includeSorts;
+        }
+        complete &= mergedSize <= size;
+        int finalSize = Math.min(mergedSize, size);
+
+        MergedGeoLines mergedGeoLines = new MergedGeoLines(internalGeoLines, finalSize, sortOrder);
+        mergedGeoLines.merge();
+        // the final reduce should always be in ascending order
+        if (reduceContext.isFinalReduce() && SortOrder.DESC.equals(sortOrder)) {
+            new PathArraySorter(mergedGeoLines.getFinalPoints(), mergedGeoLines.getFinalSortValues(), SortOrder.ASC).sort();
+        }
+        return new InternalGeoLine(name, mergedGeoLines.getFinalPoints(), mergedGeoLines.getFinalSortValues(), getMetadata(), complete,
+            includeSorts, sortOrder, size);
+    }
+
+    @Override
+    protected boolean mustReduceOnSingleInternalAgg() {
+        return true;
+    }
+
+    @Override
+    public String getWriteableName() {
+        return GeoLineAggregationBuilder.NAME;
+    }
+
+    public long[] line() {
+        return line;
+    }
+
+    public double[] sortVals() {
+        return sortVals;
+    }
+
+    public int length() {
+        return line.length;
+    }
+
+    public boolean isComplete() {
+        return complete;
+    }
+
+    public boolean includeSorts() {
+        return includeSorts;
+    }
+
+    public SortOrder sortOrder() {
+        return sortOrder;
+    }
+
+    public int size() {
+        return size;
+    }
+
+    @Override
+    public XContentBuilder doXContentBody(XContentBuilder builder, Params params) throws IOException {
+        final List<double[]> coordinates = new ArrayList<>();
+        for (int i = 0; i < line.length; i++) {
+            int x = (int) (line[i] >> 32);
+            int y = (int) line[i];
+            coordinates.add(new double[] {
+                roundDegrees(GeoEncodingUtils.decodeLongitude(x)),
+                roundDegrees(GeoEncodingUtils.decodeLatitude(y))
+            });
+        }
+        builder
+            .field("type", "Feature")
+            .startObject("geometry")
+                .field("type", "LineString")
+                .array("coordinates", coordinates.toArray())
+            .endObject()
+            .startObject("properties")
+                .field("complete", isComplete());
+        if (includeSorts) {
+            builder.field("sort_values", sortVals);
+        }
+        builder.endObject();
+        return builder;
+    }
+
+    private double roundDegrees(double degree) {
+        return Math.round(degree * SCALE) / SCALE;
+    }
+
+    @Override
+    public Object getProperty(List<String> path) {
+        if (path.isEmpty()) {
+            return this;
+        } else if (path.size() == 1 && "value".equals(path.get(0))) {
+            return line;
+        } else {
+            throw new IllegalArgumentException("path not supported for [" + getName() + "]: " + path);
+        }
+    }
+
+    @Override
+    public String toString() {
+        return Strings.toString(this);
+    }
+
+    @Override
+    public int hashCode() {
+        return Objects.hash(super.hashCode(), Arrays.hashCode(line), Arrays.hashCode(sortVals), complete, includeSorts, sortOrder, size);
+    }
+
+    @Override
+    public boolean equals(Object obj) {
+        if (this == obj) return true;
+        if (obj == null || getClass() != obj.getClass()) return false;
+        if (super.equals(obj) == false) return false;
+
+        InternalGeoLine that = (InternalGeoLine) obj;
+        return super.equals(obj)
+            && Arrays.equals(line, that.line)
+            && Arrays.equals(sortVals, that.sortVals)
+            && Objects.equals(complete, that.complete)
+            && Objects.equals(includeSorts, that.includeSorts)
+            && Objects.equals(sortOrder, that.sortOrder)
+            && Objects.equals(size, that.size);
+
+    }
+}

+ 174 - 0
x-pack/plugin/spatial/src/main/java/org/elasticsearch/xpack/spatial/search/aggregations/MergedGeoLines.java

@@ -0,0 +1,174 @@
+/*
+ * Copyright Elasticsearch B.V. and/or licensed to Elasticsearch B.V. under one
+ * or more contributor license agreements. Licensed under the Elastic License;
+ * you may not use this file except in compliance with the Elastic License.
+ */
+package org.elasticsearch.xpack.spatial.search.aggregations;
+
+import org.elasticsearch.search.sort.SortOrder;
+
+import java.util.List;
+
+/**
+ * Class to merge an arbitrary list of {@link InternalGeoLine} lines into a new line
+ * with the appropriate max length. The final point and sort values can be found in
+ * finalPoints and finalSortValues after merge is called.
+ */
+final class MergedGeoLines {
+
+    private final List<InternalGeoLine> geoLines;
+    private final int capacity;
+    private final SortOrder sortOrder;
+    private final int[] lineIndices; // index of which geoLine item represents
+    private final int[] idxsWithinLine; // index within the geoLine for the item
+    private int size;
+    private final long[] finalPoints;       // the final sorted list of points, sorted by their respective sort-values. valid after merge
+    private final double[] finalSortValues; // the final sorted list of sort-values. valid after merge.
+
+    MergedGeoLines(List<InternalGeoLine> geoLines, int finalLength, SortOrder sortOrder) {
+        this.geoLines = geoLines;
+        this.capacity = geoLines.size();
+        this.sortOrder = sortOrder;
+        this.lineIndices = new int[capacity];
+        this.idxsWithinLine = new int[capacity];
+        this.size = 0;
+        this.finalPoints = new long[finalLength];
+        this.finalSortValues = new double[finalLength];
+    }
+
+    public long[] getFinalPoints() {
+        return finalPoints;
+    }
+
+    public double[] getFinalSortValues() {
+        return finalSortValues;
+    }
+
+    /**
+     * merges <code>geoLines</code> into one sorted list of values representing the combined line.
+     */
+    public void merge() {
+        // 1. add first element of each sub line to heap
+        for (int i = 0; i < geoLines.size(); i++) {
+            if (geoLines.size() > 0) {
+                add(i, 0);
+            }
+        }
+
+        // 2. take lowest/greatest value from heap and re-insert the next value from the same sub-line that specific value was chosen from.
+
+        int i = 0;
+        while (i < finalPoints.length && size > 0) {
+            // take top from heap and place in finalLists
+            int lineIdx = lineIndices[0];
+            int idxInLine = idxsWithinLine[0];
+            finalPoints[i] = getTopPoint();
+            finalSortValues[i] = getTopSortValue();
+            removeTop();
+            InternalGeoLine lineChosen = geoLines.get(lineIdx);
+            if (idxInLine + 1 < lineChosen.line().length) {
+                add(lineIdx, idxInLine + 1);
+            }
+            i++;
+        }
+    }
+
+    private long getTopPoint() {
+        InternalGeoLine line = geoLines.get(lineIndices[0]);
+        return line.line()[idxsWithinLine[0]];
+    }
+
+    private double getTopSortValue() {
+        InternalGeoLine line = geoLines.get(lineIndices[0]);
+        return line.sortVals()[idxsWithinLine[0]];
+    }
+
+    private void removeTop() {
+        if (size == 0) {
+            throw new IllegalStateException();
+        }
+        lineIndices[0] = lineIndices[size - 1];
+        idxsWithinLine[0] = idxsWithinLine[size - 1];
+        size--;
+        heapifyDown();
+    }
+
+    private void add(int lineIndex, int idxWithinLine) {
+        if (size >= capacity) {
+            throw new IllegalStateException();
+        }
+        lineIndices[size] = lineIndex;
+        idxsWithinLine[size] = idxWithinLine;
+        size++;
+        heapifyUp();
+    }
+
+    private boolean correctOrdering(int i, int j) {
+        InternalGeoLine lineI = geoLines.get(lineIndices[i]);
+        InternalGeoLine lineJ = geoLines.get(lineIndices[j]);
+        double valI = lineI.sortVals()[idxsWithinLine[i]];
+        double valJ = lineJ.sortVals()[idxsWithinLine[j]];
+        if (SortOrder.ASC.equals(sortOrder)) {
+            return valI > valJ;
+        }
+        return valI < valJ;
+    }
+
+    private int getParentIndex(int i) {
+        return (i - 1) / 2;
+    }
+
+    private int getLeftChildIndex(int i) {
+        return 2 * i + 1;
+    }
+
+    private int getRightChildIndex(int i) {
+        return 2 * i + 2;
+    }
+
+    private boolean hasParent(int i) {
+        return i > 0;
+    }
+
+    private boolean hasLeftChild(int i) {
+        return getLeftChildIndex(i) < size;
+    }
+
+    private boolean hasRightChild(int i) {
+        return getRightChildIndex(i) < size;
+    }
+
+    private void heapifyUp() {
+        int i = size - 1;
+        while (hasParent(i) && correctOrdering(getParentIndex(i), i)) {
+            int parentIndex = getParentIndex(i);
+            swap(parentIndex, i);
+            i = parentIndex;
+        }
+    }
+
+    private void heapifyDown() {
+        int i = 0;
+        while (hasLeftChild(i)) {
+            int childIndex = getLeftChildIndex(i);
+            if (hasRightChild(i) && correctOrdering(getRightChildIndex(i), childIndex) == false) {
+                childIndex = getRightChildIndex(i);
+            }
+            if (correctOrdering(childIndex, i)) {
+                break;
+            } else {
+                swap(childIndex, i);
+                i = childIndex;
+            }
+        }
+    }
+
+    private void swap(int i, int j) {
+        int tmpLineIndex = lineIndices[i];
+        int tmpIdxWithinLine = idxsWithinLine[i];
+        lineIndices[i] = lineIndices[j];
+        idxsWithinLine[i] = idxsWithinLine[j];
+        lineIndices[j] = tmpLineIndex;
+        idxsWithinLine[j] = tmpIdxWithinLine;
+    }
+}

+ 55 - 0
x-pack/plugin/spatial/src/main/java/org/elasticsearch/xpack/spatial/search/aggregations/PathArraySorter.java

@@ -0,0 +1,55 @@
+/*
+ * Copyright Elasticsearch B.V. and/or licensed to Elasticsearch B.V. under one
+ * or more contributor license agreements. Licensed under the Elastic License;
+ * you may not use this file except in compliance with the Elastic License.
+ */
+package org.elasticsearch.xpack.spatial.search.aggregations;
+
+import org.apache.lucene.util.IntroSorter;
+import org.elasticsearch.search.sort.SortOrder;
+
+/**
+ * An {@link IntroSorter} that sorts <code>points</code> and <code>sortValues</code> using the
+ */
+final class PathArraySorter extends IntroSorter {
+
+    private final long[] points;
+    private final double[] sortValues;
+    private double sortValuePivot;
+    private final SortOrder sortOrder;
+
+    PathArraySorter(long[] points, double[] sortValues, SortOrder sortOrder) {
+        assert points.length == sortValues.length;
+        this.points = points;
+        this.sortValues = sortValues;
+        this.sortValuePivot = 0;
+        this.sortOrder = sortOrder;
+    }
+
+    public void sort() {
+        sort(0, points.length);
+    }
+
+    @Override
+    protected void swap(int i, int j) {
+        final long tmpPoint = points[i];
+        points[i] = points[j];
+        points[j] = tmpPoint;
+        final double tmpSortValue = sortValues[i];
+        sortValues[i] = sortValues[j];
+        sortValues[j] = tmpSortValue;
+    }
+
+    @Override
+    protected void setPivot(int i) {
+        sortValuePivot = sortValues[i];
+    }
+
+    @Override
+    protected int comparePivot(int j) {
+        if (SortOrder.ASC.equals(sortOrder)) {
+            return Double.compare(sortValuePivot, sortValues[j]);
+        }
+        return Double.compare(sortValues[j], sortValuePivot);
+    }
+}

+ 60 - 0
x-pack/plugin/spatial/src/main/java/org/elasticsearch/xpack/spatial/search/aggregations/support/GeoLineMultiValuesSource.java

@@ -0,0 +1,60 @@
+/*
+ * Copyright Elasticsearch B.V. and/or licensed to Elasticsearch B.V. under one
+ * or more contributor license agreements. Licensed under the Elastic License;
+ * you may not use this file except in compliance with the Elastic License.
+ */
+
+package org.elasticsearch.xpack.spatial.search.aggregations.support;
+
+import org.apache.lucene.index.LeafReaderContext;
+import org.elasticsearch.index.fielddata.MultiGeoPointValues;
+import org.elasticsearch.index.fielddata.SortedNumericDoubleValues;
+import org.elasticsearch.index.query.QueryShardContext;
+import org.elasticsearch.search.aggregations.AggregationExecutionException;
+import org.elasticsearch.search.aggregations.support.MultiValuesSource;
+import org.elasticsearch.search.aggregations.support.ValuesSource;
+import org.elasticsearch.search.aggregations.support.ValuesSourceConfig;
+
+import java.io.IOException;
+import java.util.HashMap;
+import java.util.Map;
+
+public class GeoLineMultiValuesSource extends MultiValuesSource<ValuesSource> {
+    public GeoLineMultiValuesSource(Map<String, ValuesSourceConfig> valuesSourceConfigs, QueryShardContext context) {
+        values = new HashMap<>(valuesSourceConfigs.size());
+        for (Map.Entry<String, ValuesSourceConfig> entry : valuesSourceConfigs.entrySet()) {
+            final ValuesSource valuesSource = entry.getValue().getValuesSource();
+            if (valuesSource instanceof ValuesSource.Numeric == false
+                && valuesSource instanceof ValuesSource.GeoPoint == false) {
+                throw new AggregationExecutionException("ValuesSource type " + valuesSource.toString() +
+                    "is not supported for multi-valued aggregation");
+            }
+            values.put(entry.getKey(), valuesSource);
+        }
+    }
+
+    private ValuesSource getField(String fieldName) {
+        ValuesSource valuesSource = values.get(fieldName);
+        if (valuesSource == null) {
+            throw new IllegalArgumentException("Could not find field name [" + fieldName + "] in multiValuesSource");
+        }
+        return valuesSource;
+    }
+
+    public SortedNumericDoubleValues getNumericField(String fieldName, LeafReaderContext ctx) throws IOException {
+        ValuesSource valuesSource = getField(fieldName);
+        if (valuesSource instanceof ValuesSource.Numeric) {
+            return ((ValuesSource.Numeric) valuesSource).doubleValues(ctx);
+        }
+        throw new IllegalArgumentException("field [" + fieldName + "] is not a numeric type");
+    }
+
+    public MultiGeoPointValues getGeoPointField(String fieldName, LeafReaderContext ctx) {
+        ValuesSource valuesSource = getField(fieldName);
+        if (valuesSource instanceof ValuesSource.GeoPoint) {
+            return ((ValuesSource.GeoPoint) valuesSource).geoPointValues(ctx);
+        }
+        throw new IllegalArgumentException("field [" + fieldName + "] is not a geo_point type");
+    }
+
+}

+ 76 - 0
x-pack/plugin/spatial/src/test/java/org/elasticsearch/xpack/spatial/search/aggregations/GeoLineAggregationBuilderTests.java

@@ -0,0 +1,76 @@
+/*
+ * Copyright Elasticsearch B.V. and/or licensed to Elasticsearch B.V. under one
+ * or more contributor license agreements. Licensed under the Elastic License;
+ * you may not use this file except in compliance with the Elastic License.
+ */
+
+package org.elasticsearch.xpack.spatial.search.aggregations;
+
+import org.elasticsearch.common.io.stream.Writeable;
+import org.elasticsearch.common.xcontent.XContentParser;
+import org.elasticsearch.search.aggregations.support.MultiValuesSourceFieldConfig;
+import org.elasticsearch.search.sort.SortOrder;
+import org.elasticsearch.test.AbstractSerializingTestCase;
+
+import java.io.IOException;
+
+import static org.hamcrest.Matchers.equalTo;
+
+public class GeoLineAggregationBuilderTests extends AbstractSerializingTestCase<GeoLineAggregationBuilder> {
+
+    @Override
+    protected GeoLineAggregationBuilder doParseInstance(XContentParser parser) throws IOException {
+        assertThat(parser.nextToken(), equalTo(XContentParser.Token.START_OBJECT));
+        assertThat(parser.nextToken(), equalTo(XContentParser.Token.FIELD_NAME));
+        String name = parser.currentName();
+        assertThat(parser.nextToken(), equalTo(XContentParser.Token.START_OBJECT));
+        assertThat(parser.nextToken(), equalTo(XContentParser.Token.FIELD_NAME));
+        assertThat(parser.currentName(), equalTo(GeoLineAggregationBuilder.NAME));
+        GeoLineAggregationBuilder parsed = GeoLineAggregationBuilder.PARSER.apply(parser, name);
+        assertThat(parser.nextToken(), equalTo(XContentParser.Token.END_OBJECT));
+        assertThat(parser.nextToken(), equalTo(XContentParser.Token.END_OBJECT));
+        return parsed;
+    }
+
+    @Override
+    protected Writeable.Reader<GeoLineAggregationBuilder> instanceReader() {
+        return GeoLineAggregationBuilder::new;
+    }
+
+    @Override
+    protected GeoLineAggregationBuilder createTestInstance() {
+        MultiValuesSourceFieldConfig pointConfig = new MultiValuesSourceFieldConfig.Builder()
+            .setFieldName(randomAlphaOfLength(5))
+            .build();
+        MultiValuesSourceFieldConfig sortConfig = new MultiValuesSourceFieldConfig.Builder()
+            .setFieldName(randomAlphaOfLength(6)).build();
+        GeoLineAggregationBuilder lineAggregationBuilder = new GeoLineAggregationBuilder("_name")
+            .point(pointConfig)
+            .sort(sortConfig);
+        if (randomBoolean()) {
+            SortOrder sortOrder = randomFrom(SortOrder.values());
+            lineAggregationBuilder.sortOrder(sortOrder);
+        }
+        if (randomBoolean()) {
+            lineAggregationBuilder.size(randomIntBetween(1, GeoLineAggregationBuilder.MAX_PATH_SIZE));
+        }
+        if (randomBoolean()) {
+            lineAggregationBuilder.includeSort(randomBoolean());
+        }
+        return lineAggregationBuilder;
+    }
+
+    public void testInvalidSize() {
+        MultiValuesSourceFieldConfig pointConfig = new MultiValuesSourceFieldConfig.Builder()
+            .setFieldName(randomAlphaOfLength(5))
+            .build();
+        MultiValuesSourceFieldConfig sortConfig = new MultiValuesSourceFieldConfig.Builder()
+            .setFieldName(randomAlphaOfLength(6)).build();
+        GeoLineAggregationBuilder lineAggregationBuilder = new GeoLineAggregationBuilder("_name")
+            .point(pointConfig)
+            .sort(sortConfig);
+        expectThrows(IllegalArgumentException.class, () -> lineAggregationBuilder.size(0));
+        expectThrows(IllegalArgumentException.class,
+            () -> lineAggregationBuilder.size(GeoLineAggregationBuilder.MAX_PATH_SIZE + randomIntBetween(1, 10)));
+    }
+}

+ 229 - 0
x-pack/plugin/spatial/src/test/java/org/elasticsearch/xpack/spatial/search/aggregations/GeoLineAggregatorTests.java

@@ -0,0 +1,229 @@
+/*
+ * Copyright Elasticsearch B.V. and/or licensed to Elasticsearch B.V. under one
+ * or more contributor license agreements. Licensed under the Elastic License;
+ * you may not use this file except in compliance with the Elastic License.
+ */
+package org.elasticsearch.xpack.spatial.search.aggregations;
+
+import org.apache.lucene.document.LatLonDocValuesField;
+import org.apache.lucene.document.SortedDocValuesField;
+import org.apache.lucene.document.SortedNumericDocValuesField;
+import org.apache.lucene.geo.GeoEncodingUtils;
+import org.apache.lucene.index.DirectoryReader;
+import org.apache.lucene.index.IndexReader;
+import org.apache.lucene.index.RandomIndexWriter;
+import org.apache.lucene.search.IndexSearcher;
+import org.apache.lucene.search.MatchAllDocsQuery;
+import org.apache.lucene.search.Query;
+import org.apache.lucene.store.Directory;
+import org.apache.lucene.util.BytesRef;
+import org.apache.lucene.util.NumericUtils;
+import org.elasticsearch.common.CheckedConsumer;
+import org.elasticsearch.geo.GeometryTestUtils;
+import org.elasticsearch.geometry.Point;
+import org.elasticsearch.index.mapper.GeoPointFieldMapper;
+import org.elasticsearch.index.mapper.KeywordFieldMapper;
+import org.elasticsearch.index.mapper.MappedFieldType;
+import org.elasticsearch.index.mapper.NumberFieldMapper;
+import org.elasticsearch.plugins.SearchPlugin;
+import org.elasticsearch.search.aggregations.AggregatorTestCase;
+import org.elasticsearch.search.aggregations.bucket.terms.Terms;
+import org.elasticsearch.search.aggregations.bucket.terms.TermsAggregationBuilder;
+import org.elasticsearch.search.aggregations.support.MultiValuesSourceFieldConfig;
+import org.elasticsearch.search.sort.SortOrder;
+import org.elasticsearch.xpack.spatial.SpatialPlugin;
+
+import java.io.IOException;
+import java.util.Arrays;
+import java.util.Collections;
+import java.util.HashMap;
+import java.util.List;
+import java.util.Map;
+import java.util.function.Consumer;
+
+import static org.hamcrest.CoreMatchers.equalTo;
+
+public class GeoLineAggregatorTests extends AggregatorTestCase {
+
+    @Override
+    protected List<SearchPlugin> getSearchPlugins() {
+        return Collections.singletonList(new SpatialPlugin());
+    }
+
+    // test that missing values are ignored
+    public void testMissingValues() throws IOException {
+        MultiValuesSourceFieldConfig valueConfig = new MultiValuesSourceFieldConfig.Builder()
+            .setFieldName("value_field")
+            .build();
+        MultiValuesSourceFieldConfig sortConfig = new MultiValuesSourceFieldConfig.Builder().setFieldName("sort_field").build();
+        GeoLineAggregationBuilder lineAggregationBuilder = new GeoLineAggregationBuilder("_name")
+            .point(valueConfig)
+            .sortOrder(SortOrder.ASC)
+            .sort(sortConfig)
+            .size(10);
+
+        TermsAggregationBuilder aggregationBuilder = new TermsAggregationBuilder("_name")
+            .field("group_id")
+            .subAggregation(lineAggregationBuilder);
+
+        long lonLat = (((long) GeoEncodingUtils.encodeLongitude(90.0)) << 32) | GeoEncodingUtils.encodeLatitude(45.0) & 0xffffffffL;
+        //input
+        long[] points = new long[] {lonLat, 0, lonLat, 0,lonLat, lonLat, lonLat};
+        double[] sortValues = new double[]{1, 0, 2, 0, 3, 4, 5};
+        //expected
+        long[] expectedAggPoints = new long[] {lonLat, lonLat, lonLat, lonLat, lonLat};
+        double[] expectedAggSortValues = new double[]{
+            NumericUtils.doubleToSortableLong(1),
+            NumericUtils.doubleToSortableLong(2),
+            NumericUtils.doubleToSortableLong(3),
+            NumericUtils.doubleToSortableLong(4),
+            NumericUtils.doubleToSortableLong(5)
+        };
+
+        testCase(new MatchAllDocsQuery(), aggregationBuilder, iw -> {
+
+            for (int i = 0; i < points.length; i++) {
+                if (points[i] == 0) {
+                    // do not index value
+                    iw.addDocument(Collections.singletonList(new SortedDocValuesField("group_id", new BytesRef("group"))));
+                } else {
+                    iw.addDocument(Arrays.asList(new LatLonDocValuesField("value_field", 45.0, 90.0),
+                        new SortedNumericDocValuesField("sort_field", NumericUtils.doubleToSortableLong(sortValues[i])),
+                        new SortedDocValuesField("group_id", new BytesRef("group"))));
+                }
+            }
+        }, terms -> {
+            assertThat(terms.getBuckets().size(), equalTo(1));
+            InternalGeoLine geoLine = terms.getBuckets().get(0).getAggregations().get("_name");
+            assertThat(geoLine.length(), equalTo(5));
+            assertTrue(geoLine.isComplete());
+            assertArrayEquals(expectedAggPoints, geoLine.line());
+            assertArrayEquals(expectedAggSortValues, geoLine.sortVals(), 0d);
+        });
+    }
+
+    public void testAscending() throws IOException {
+        testAggregator(SortOrder.ASC);
+    }
+
+    public void testDescending() throws IOException {
+        testAggregator(SortOrder.DESC);
+    }
+
+    private void testAggregator(SortOrder sortOrder) throws IOException {
+        int size = randomIntBetween(1, GeoLineAggregationBuilder.MAX_PATH_SIZE);
+        MultiValuesSourceFieldConfig valueConfig = new MultiValuesSourceFieldConfig.Builder()
+            .setFieldName("value_field")
+            .build();
+        MultiValuesSourceFieldConfig sortConfig = new MultiValuesSourceFieldConfig.Builder().setFieldName("sort_field").build();
+        GeoLineAggregationBuilder lineAggregationBuilder = new GeoLineAggregationBuilder("_name")
+            .point(valueConfig)
+            .sortOrder(sortOrder)
+            .sort(sortConfig)
+            .size(size);
+        TermsAggregationBuilder aggregationBuilder = new TermsAggregationBuilder("_name")
+            .field("group_id")
+            .subAggregation(lineAggregationBuilder);
+
+        int numGroups = randomIntBetween(1, 2);
+        Map<String, InternalGeoLine> lines = new HashMap<>(numGroups);
+        Map<Integer, long[]> indexedPoints = new HashMap<>(numGroups);
+        Map<Integer, double[]> indexedSortValues = new HashMap<>(numGroups);
+        for (int groupOrd = 0; groupOrd < numGroups; groupOrd++) {
+            int numPoints = randomIntBetween(2, 2 * size);
+            boolean complete = numPoints <= size;
+            long[] points = new long[numPoints];
+            double[] sortValues = new double[numPoints];
+            for (int i = 0; i < numPoints; i++) {
+                Point point = GeometryTestUtils.randomPoint(false);
+                int encodedLat = GeoEncodingUtils.encodeLatitude(point.getLat());
+                int encodedLon = GeoEncodingUtils.encodeLongitude(point.getLon());
+                long lonLat = (((long) encodedLon) << 32) | encodedLat & 0xffffffffL;
+                points[i] = lonLat;
+                sortValues[i] = SortOrder.ASC.equals(sortOrder) ? i : numPoints - i;
+            }
+            int lineSize = Math.min(numPoints, size);
+            // re-sort line to be ascending
+            long[] linePoints = Arrays.copyOf(points, lineSize);
+            double[] lineSorts = Arrays.copyOf(sortValues, lineSize);
+            new PathArraySorter(linePoints, lineSorts, SortOrder.ASC).sort();
+
+            lines.put(String.valueOf(groupOrd), new InternalGeoLine("_name",
+                linePoints, lineSorts, null, complete, true, sortOrder, size));
+
+            for (int i = 0; i < randomIntBetween(1, numPoints); i++) {
+                int idx1 = randomIntBetween(0, numPoints - 1);
+                int idx2 = randomIntBetween(0, numPoints - 1);
+                final long tmpPoint = points[idx1];
+                points[idx1] = points[idx2];
+                points[idx2] = tmpPoint;
+                final double tmpSortValue = sortValues[idx1];
+                sortValues[idx1] = sortValues[idx2];
+                sortValues[idx2] = tmpSortValue;
+            }
+            indexedPoints.put(groupOrd, points);
+            indexedSortValues.put(groupOrd, sortValues);
+        }
+
+
+        testCase(new MatchAllDocsQuery(), aggregationBuilder, iw -> {
+            for (int group = 0; group < numGroups; group++) {
+                long[] points = indexedPoints.get(group);
+                double[] sortValues = indexedSortValues.get(group);
+                for (int i = 0; i < points.length; i++) {
+                    int x = (int) (points[i] >> 32);
+                    int y = (int) points[i];
+                    iw.addDocument(Arrays.asList(new LatLonDocValuesField("value_field",
+                            GeoEncodingUtils.decodeLatitude(y),
+                            GeoEncodingUtils.decodeLongitude(x)),
+                        new SortedNumericDocValuesField("sort_field", NumericUtils.doubleToSortableLong(sortValues[i])),
+                        new SortedDocValuesField("group_id", new BytesRef(String.valueOf(group)))));
+                }
+            }
+        }, terms -> {
+            for (Terms.Bucket bucket : terms.getBuckets()) {
+                InternalGeoLine expectedGeoLine = lines.get(bucket.getKeyAsString());
+                InternalGeoLine geoLine = bucket.getAggregations().get("_name");
+                assertThat(geoLine.length(), equalTo(expectedGeoLine.length()));
+                assertThat(geoLine.isComplete(), equalTo(expectedGeoLine.isComplete()));
+                for (int i = 0; i < geoLine.sortVals().length; i++) {
+                    geoLine.sortVals()[i] = NumericUtils.sortableLongToDouble((long) geoLine.sortVals()[i]);
+                }
+                assertArrayEquals(expectedGeoLine.sortVals(), geoLine.sortVals(), 0d);
+                assertArrayEquals(expectedGeoLine.line(), geoLine.line());
+            }
+        });
+    }
+
+    private void testCase(Query query, TermsAggregationBuilder aggregationBuilder,
+                          CheckedConsumer<RandomIndexWriter, IOException> buildIndex,
+                          Consumer<Terms> verify) throws IOException {
+        testCase(query, aggregationBuilder, buildIndex, verify, NumberFieldMapper.NumberType.LONG);
+    }
+
+    private void testCase(Query query, TermsAggregationBuilder aggregationBuilder,
+                          CheckedConsumer<RandomIndexWriter, IOException> buildIndex,
+                          Consumer<Terms> verify,
+                          NumberFieldMapper.NumberType fieldNumberType) throws IOException {
+
+        Directory directory = newDirectory();
+        RandomIndexWriter indexWriter = new RandomIndexWriter(random(), directory);
+        buildIndex.accept(indexWriter);
+        indexWriter.close();
+        IndexReader indexReader = DirectoryReader.open(directory);
+        IndexSearcher indexSearcher = newSearcher(indexReader, true, true);
+
+        try {
+            MappedFieldType fieldType = new GeoPointFieldMapper.GeoPointFieldType("value_field");
+            MappedFieldType groupFieldType = new KeywordFieldMapper.KeywordFieldType("group_id");
+            MappedFieldType fieldType2 = new NumberFieldMapper.NumberFieldType("sort_field", fieldNumberType);
+
+            Terms terms = searchAndReduce(indexSearcher, new MatchAllDocsQuery(), aggregationBuilder,
+                fieldType, fieldType2, groupFieldType);
+            verify.accept(terms);
+        } finally {
+            indexReader.close();
+            directory.close();
+        }
+    }
+}

+ 172 - 0
x-pack/plugin/spatial/src/test/java/org/elasticsearch/xpack/spatial/search/aggregations/InternalGeoLineTests.java

@@ -0,0 +1,172 @@
+/*
+ * Copyright Elasticsearch B.V. and/or licensed to Elasticsearch B.V. under one
+ * or more contributor license agreements. Licensed under the Elastic License;
+ * you may not use this file except in compliance with the Elastic License.
+ */
+package org.elasticsearch.xpack.spatial.search.aggregations;
+
+import org.elasticsearch.common.ParseField;
+import org.elasticsearch.common.xcontent.NamedXContentRegistry;
+import org.elasticsearch.plugins.SearchPlugin;
+import org.elasticsearch.search.aggregations.Aggregation;
+import org.elasticsearch.search.aggregations.ParsedAggregation;
+import org.elasticsearch.search.sort.SortOrder;
+import org.elasticsearch.test.InternalAggregationTestCase;
+import org.elasticsearch.xpack.spatial.SpatialPlugin;
+
+import java.io.IOException;
+import java.util.ArrayList;
+import java.util.Arrays;
+import java.util.HashMap;
+import java.util.List;
+import java.util.Map;
+
+import static org.hamcrest.Matchers.equalTo;
+
+public class InternalGeoLineTests extends InternalAggregationTestCase<InternalGeoLine> {
+
+    @Override
+    protected SearchPlugin registerPlugin() {
+        return new SpatialPlugin();
+    }
+
+    static InternalGeoLine randomInstance(String name, Map<String, Object> metadata, int size, SortOrder sortOrder, double magicDecimal) {
+        int length = randomIntBetween(2, size);
+        long[] points = new long[length];
+        double[] sortVals = new double[length];
+        for (int i = 0; i < length; i++) {
+            points[i] = randomNonNegativeLong();
+            sortVals[i] = i + magicDecimal;
+        }
+        Arrays.sort(sortVals);
+        if (SortOrder.DESC.equals(sortOrder)) {
+            // reverse the list
+            for (int i = 0, j = sortVals.length - 1; i < j; i++, j--) {
+                double tmp = sortVals[i];
+                sortVals[i] = sortVals[j];
+                sortVals[j] = tmp;
+            }
+        }
+        boolean complete = length <= size;
+        return new InternalGeoLine(name, points, sortVals, metadata, complete, randomBoolean(), sortOrder, size);
+    }
+
+    @Override
+    protected InternalGeoLine createTestInstance(String name, Map<String, Object> metadata) {
+        int size = randomIntBetween(10, GeoLineAggregationBuilder.MAX_PATH_SIZE);
+        return randomInstance(name, metadata, size, randomFrom(SortOrder.values()), randomDoubleBetween(0, 1, false));
+    }
+
+    @Override
+    protected InternalGeoLine mutateInstance(InternalGeoLine instance) {
+        String name = instance.getName();
+        long[] line = Arrays.copyOf(instance.line(), instance.line().length);
+        double[] sortVals = Arrays.copyOf(instance.sortVals(), instance.sortVals().length);
+        Map<String, Object> metadata = instance.getMetadata();
+        boolean complete = instance.isComplete();
+        boolean includeSorts = instance.includeSorts();
+        SortOrder sortOrder = instance.sortOrder();
+        int size = instance.size();
+        switch (randomIntBetween(0, 7)) {
+            case 0:
+                name += randomAlphaOfLength(5);
+                break;
+            case 1:
+                line[0] = line[0] + 1000000L;
+                break;
+            case 2:
+                sortVals[0] = sortVals[0] + 10000;
+                break;
+            case 3:
+                if (metadata == null) {
+                    metadata = new HashMap<>(1);
+                } else {
+                    metadata = new HashMap<>(instance.getMetadata());
+                }
+                metadata.put(randomAlphaOfLength(15), randomInt());
+                break;
+            case 4:
+                complete = !complete;
+                break;
+            case 5:
+                includeSorts = !includeSorts;
+                break;
+            case 6:
+                sortOrder = SortOrder.ASC.equals(sortOrder) ? SortOrder.DESC : SortOrder.ASC;
+                break;
+            case 7:
+                size = size + 1;
+                break;
+            default:
+                throw new AssertionError("Illegal randomisation branch");
+        }
+        return new InternalGeoLine(name, line, sortVals, metadata, complete, includeSorts, sortOrder, size);
+    }
+
+    @Override
+    protected List<InternalGeoLine> randomResultsToReduce(String name, int size) {
+        SortOrder sortOrder = randomFrom(SortOrder.values());
+        int maxLineLength = randomIntBetween(10, GeoLineAggregationBuilder.MAX_PATH_SIZE);
+        List<InternalGeoLine> instances = new ArrayList<>(size);
+        for (int i = 0; i < size; i++) {
+            // use the magicDecimal to have absolute ordering between heap-sort and testing array sorting
+            instances.add(randomInstance(name, null, maxLineLength, sortOrder, ((double) i) / size));
+        }
+        return instances;
+    }
+
+    @Override
+    protected void assertReduced(InternalGeoLine reduced, List<InternalGeoLine> inputs) {
+        int mergedLength = 0;
+        for (InternalGeoLine subLine : inputs) {
+            mergedLength += subLine.length();
+        }
+        boolean complete = mergedLength <= reduced.size();
+        int expectedReducedLength = Math.min(mergedLength, reduced.size());
+        assertThat(reduced.length(), equalTo(expectedReducedLength));
+        assertThat(complete, equalTo(reduced.isComplete()));
+
+        // check arrays
+        long[] finalList = new long[mergedLength];
+        double[] finalSortVals = new double[mergedLength];
+        int idx = 0;
+        for (InternalGeoLine geoLine : inputs) {
+            for (int i = 0; i < geoLine.line().length; i++) {
+                finalSortVals[idx] = geoLine.sortVals()[i];
+                finalList[idx] = geoLine.line()[i];
+                idx += 1;
+            }
+        }
+
+        new PathArraySorter(finalList, finalSortVals, reduced.sortOrder()).sort();
+
+        // cap to max length
+        long[] finalCappedPoints = Arrays.copyOf(finalList, Math.min(reduced.size(), mergedLength));
+        double[] finalCappedSortVals = Arrays.copyOf(finalSortVals, Math.min(reduced.size(), mergedLength));
+
+        if (SortOrder.DESC.equals(reduced.sortOrder())) {
+            new PathArraySorter(finalCappedPoints, finalCappedSortVals, SortOrder.ASC).sort();
+        }
+
+        assertArrayEquals(finalCappedSortVals, reduced.sortVals(), 0d);
+        assertArrayEquals(finalCappedPoints, reduced.line());
+    }
+
+    @Override
+    protected void assertFromXContent(InternalGeoLine aggregation, ParsedAggregation parsedAggregation) throws IOException {
+        // There is no ParsedGeoLine yet so we cannot test it here
+    }
+
+    @Override
+    protected List<NamedXContentRegistry.Entry> getNamedXContents() {
+        List<NamedXContentRegistry.Entry> extendedNamedXContents = new ArrayList<>(super.getNamedXContents());
+        extendedNamedXContents.add(new NamedXContentRegistry.Entry(Aggregation.class,
+            new ParseField(GeoLineAggregationBuilder.NAME),
+            (p, c) -> {
+                assumeTrue("There is no ParsedGeoLine yet", false);
+                return null;
+            }
+        ));
+        return extendedNamedXContents;
+    }
+}

+ 52 - 0
x-pack/plugin/spatial/src/test/java/org/elasticsearch/xpack/spatial/search/aggregations/MergedGeoLinesTests.java

@@ -0,0 +1,52 @@
+/*
+ * Copyright Elasticsearch B.V. and/or licensed to Elasticsearch B.V. under one
+ * or more contributor license agreements. Licensed under the Elastic License;
+ * you may not use this file except in compliance with the Elastic License.
+ */
+package org.elasticsearch.xpack.spatial.search.aggregations;
+
+import org.elasticsearch.search.sort.SortOrder;
+import org.elasticsearch.test.ESTestCase;
+
+import java.util.ArrayList;
+import java.util.Arrays;
+import java.util.Collections;
+import java.util.List;
+
+public class MergedGeoLinesTests extends ESTestCase {
+
+    public InternalGeoLine randomLine(SortOrder sortOrder, int maxLength, double magicDecimal) {
+        String name = randomAlphaOfLength(5);
+        int length = randomBoolean() ? maxLength : randomIntBetween(1, maxLength);
+        boolean complete = length < maxLength;
+        long[] points = new long[length];
+        double[] sortValues = new double[length];
+        for (int i = 0; i < length; i++) {
+            points[i] = randomIntBetween(1, 100);
+            sortValues[i] = i + magicDecimal;
+        }
+        return new InternalGeoLine(name, points, sortValues, Collections.emptyMap(), complete, randomBoolean(), sortOrder, maxLength);
+    }
+
+    public void testSimpleMerge() {
+        int numLines = 10;
+        int maxLength = 100;
+        int finalLength = 0;
+        SortOrder sortOrder = SortOrder.ASC;
+        List<InternalGeoLine> geoLines = new ArrayList<>();
+        for (int i = 0; i < numLines; i++) {
+            geoLines.add(randomLine(sortOrder, maxLength, ((double) i) / numLines));
+            finalLength += geoLines.get(i).length();
+        }
+        finalLength = Math.min(maxLength, finalLength);
+        MergedGeoLines mergedGeoLines = new MergedGeoLines(geoLines, finalLength, sortOrder);
+        mergedGeoLines.merge();
+
+        // assert that the mergedGeoLines are sorted (does not necessarily validate correctness, but it is a good heuristic)
+        long[] sortedPoints = Arrays.copyOf(mergedGeoLines.getFinalPoints(), mergedGeoLines.getFinalPoints().length);
+        double[] sortedValues = Arrays.copyOf(mergedGeoLines.getFinalSortValues(), mergedGeoLines.getFinalSortValues().length);
+        new PathArraySorter(sortedPoints, sortedValues, sortOrder).sort();
+        assertArrayEquals(sortedValues, mergedGeoLines.getFinalSortValues(), 0d);
+        assertArrayEquals(sortedPoints, mergedGeoLines.getFinalPoints());
+    }
+}

+ 59 - 0
x-pack/plugin/spatial/src/test/resources/rest-api-spec/test/50_geoline.yml

@@ -0,0 +1,59 @@
+---
+"Test geoline agg":
+  - do:
+      indices.create:
+        index: locations
+        body:
+          mappings:
+            properties:
+              location:
+                type: geo_point
+              rank:
+                type: double
+
+  - do:
+      bulk:
+        refresh: true
+        body:
+          - index:
+              _index: locations
+              _id: 1
+          - '{"location": [13.37139831, 47.82930284], "rank": 2.0 }'
+          - index:
+              _index: locations
+              _id: 2
+          - '{"location": [13.3784208402, 47.88832084022], "rank": 0.0 }'
+          - index:
+              _index: locations
+              _id: 3
+          - '{"location": [13.371830148701, 48.2084200148], "rank": 1.2 }'
+
+  - do:
+      search:
+        rest_total_hits_as_int: true
+        index: locations
+        size: 0
+        body:
+          aggs:
+            path:
+              geo_line:
+                include_sort: true
+                geo_point:
+                  field: location
+                sort:
+                  field: rank
+  - match: { hits.total:  3 }
+  - match: { aggregations.path.type: "Feature" }
+  - match: { aggregations.path.geometry.type: "LineString" }
+  - length: { aggregations.path.geometry.coordinates: 3 }
+  - match: { aggregations.path.geometry.coordinates.0.0: 13.378421 }
+  - match: { aggregations.path.geometry.coordinates.0.1: 47.888321 }
+  - match: { aggregations.path.geometry.coordinates.1.0: 13.37183 }
+  - match: { aggregations.path.geometry.coordinates.1.1: 48.20842 }
+  - match: { aggregations.path.geometry.coordinates.2.0: 13.371398 }
+  - match: { aggregations.path.geometry.coordinates.2.1: 47.829303 }
+  - is_true: aggregations.path.properties.complete
+  - length: { aggregations.path.properties.sort_values: 3 }
+  - match: { aggregations.path.properties.sort_values.0: 0.0 }
+  - match: { aggregations.path.properties.sort_values.1: 1.2 }
+  - match: { aggregations.path.properties.sort_values.2: 2.0 }

+ 51 - 0
x-pack/plugin/spatial/src/yamlRestTest/resources/rest-api-spec/test/60_geo_line.yml

@@ -0,0 +1,51 @@
+---
+"Test geo_line aggregation on geo points":
+  - do:
+      indices.create:
+        index: races
+        body:
+          mappings:
+            properties:
+              race_id:
+                type: keyword
+              position:
+                type: geo_point
+
+  - do:
+      bulk:
+        refresh: true
+        body:
+          - index:
+              _index: races
+              _id: 1
+          - '{"position": "POINT(4.912350 52.374081)", "race_id": "Amsterdam", "timestamp": 4}'
+          - index:
+              _index:  races
+              _id: 2
+          - '{"position": "POINT(4.901618 52.369219)", "race_id": "Amsterdam", "timestamp": 3}'
+          - index:
+              _index: races
+              _id: 3
+          - '{"position": "POINT(4.914722 52.371667)", "race_id": "Amsterdam", "timestamp": 10}'
+
+  - do:
+      search:
+        rest_total_hits_as_int: true
+        index: races
+        size: 0
+        body:
+          aggs:
+            trace:
+              geo_line:
+                point:
+                  field: position
+                sort:
+                  field: timestamp
+  - match: { hits.total: 3 }
+  - match: { aggregations.trace.type: "Feature" }
+  - match: { aggregations.trace.geometry.type: "LineString" }
+  - length: { aggregations.trace.geometry.coordinates: 3 }
+  - match: { aggregations.trace.geometry.coordinates.0: [4.901618, 52.369219] }
+  - match: { aggregations.trace.geometry.coordinates.1: [4.91235, 52.374081] }
+  - match: { aggregations.trace.geometry.coordinates.2: [4.914722, 52.371667] }
+  - is_true: aggregations.trace.properties.complete