|
@@ -19,10 +19,11 @@
|
|
|
package org.elasticsearch.search.aggregations.bucket.geogrid;
|
|
|
|
|
|
import org.apache.lucene.index.LeafReaderContext;
|
|
|
-import org.apache.lucene.index.SortedNumericDocValues;
|
|
|
import org.apache.lucene.search.ScoreMode;
|
|
|
+import org.elasticsearch.common.geo.GeoPoint;
|
|
|
import org.elasticsearch.common.lease.Releasables;
|
|
|
import org.elasticsearch.common.util.LongHash;
|
|
|
+import org.elasticsearch.index.fielddata.MultiGeoPointValues;
|
|
|
import org.elasticsearch.search.aggregations.Aggregator;
|
|
|
import org.elasticsearch.search.aggregations.AggregatorFactories;
|
|
|
import org.elasticsearch.search.aggregations.InternalAggregations;
|
|
@@ -30,6 +31,7 @@ import org.elasticsearch.search.aggregations.LeafBucketCollector;
|
|
|
import org.elasticsearch.search.aggregations.LeafBucketCollectorBase;
|
|
|
import org.elasticsearch.search.aggregations.bucket.BucketsAggregator;
|
|
|
import org.elasticsearch.search.aggregations.pipeline.PipelineAggregator;
|
|
|
+import org.elasticsearch.search.aggregations.support.ValuesSource;
|
|
|
import org.elasticsearch.search.internal.SearchContext;
|
|
|
|
|
|
import java.io.IOException;
|
|
@@ -45,14 +47,19 @@ public abstract class GeoGridAggregator<T extends InternalGeoGrid> extends Bucke
|
|
|
|
|
|
protected final int requiredSize;
|
|
|
protected final int shardSize;
|
|
|
- protected final CellIdSource valuesSource;
|
|
|
+ protected final ValuesSource.GeoPoint valuesSource;
|
|
|
+ protected final int precision;
|
|
|
+ protected final GeoPointLongEncoder longEncoder;
|
|
|
protected final LongHash bucketOrds;
|
|
|
|
|
|
- GeoGridAggregator(String name, AggregatorFactories factories, CellIdSource valuesSource,
|
|
|
+ GeoGridAggregator(String name, AggregatorFactories factories, ValuesSource.GeoPoint valuesSource,
|
|
|
+ int precision, GeoPointLongEncoder longEncoder,
|
|
|
int requiredSize, int shardSize, SearchContext aggregationContext, Aggregator parent,
|
|
|
List<PipelineAggregator> pipelineAggregators, Map<String, Object> metaData) throws IOException {
|
|
|
super(name, factories, aggregationContext, parent, pipelineAggregators, metaData);
|
|
|
this.valuesSource = valuesSource;
|
|
|
+ this.precision = precision;
|
|
|
+ this.longEncoder = longEncoder;
|
|
|
this.requiredSize = requiredSize;
|
|
|
this.shardSize = shardSize;
|
|
|
bucketOrds = new LongHash(1, aggregationContext.bigArrays());
|
|
@@ -69,7 +76,7 @@ public abstract class GeoGridAggregator<T extends InternalGeoGrid> extends Bucke
|
|
|
@Override
|
|
|
public LeafBucketCollector getLeafCollector(LeafReaderContext ctx,
|
|
|
final LeafBucketCollector sub) throws IOException {
|
|
|
- final SortedNumericDocValues values = valuesSource.longValues(ctx);
|
|
|
+ final MultiGeoPointValues values = valuesSource.geoPointValues(ctx);
|
|
|
return new LeafBucketCollectorBase(sub, null) {
|
|
|
@Override
|
|
|
public void collect(int doc, long bucket) throws IOException {
|
|
@@ -79,7 +86,8 @@ public abstract class GeoGridAggregator<T extends InternalGeoGrid> extends Bucke
|
|
|
|
|
|
long previous = Long.MAX_VALUE;
|
|
|
for (int i = 0; i < valuesCount; ++i) {
|
|
|
- final long val = values.nextValue();
|
|
|
+ final GeoPoint point = values.nextValue();
|
|
|
+ final long val = longEncoder.encode(point.getLon(), point.getLat(), precision);
|
|
|
if (previous != val || i == 0) {
|
|
|
long bucketOrdinal = bucketOrds.add(val);
|
|
|
if (bucketOrdinal < 0) { // already seen
|
|
@@ -189,4 +197,12 @@ public abstract class GeoGridAggregator<T extends InternalGeoGrid> extends Bucke
|
|
|
Releasables.close(bucketOrds);
|
|
|
}
|
|
|
|
|
|
+ /**
|
|
|
+ * The encoder to use to convert a geopoint's (lon, lat, precision) into
|
|
|
+ * a long-encoded bucket key for aggregating.
|
|
|
+ */
|
|
|
+ @FunctionalInterface
|
|
|
+ public interface GeoPointLongEncoder {
|
|
|
+ long encode(double lon, double lat, int precision);
|
|
|
+ }
|
|
|
}
|