|
@@ -22,13 +22,13 @@ import org.apache.lucene.index.LeafReaderContext;
|
|
|
import org.apache.lucene.index.SortedNumericDocValues;
|
|
import org.apache.lucene.index.SortedNumericDocValues;
|
|
|
import org.apache.lucene.search.ScoreMode;
|
|
import org.apache.lucene.search.ScoreMode;
|
|
|
import org.elasticsearch.common.lease.Releasables;
|
|
import org.elasticsearch.common.lease.Releasables;
|
|
|
-import org.elasticsearch.common.util.LongHash;
|
|
|
|
|
import org.elasticsearch.search.aggregations.Aggregator;
|
|
import org.elasticsearch.search.aggregations.Aggregator;
|
|
|
import org.elasticsearch.search.aggregations.AggregatorFactories;
|
|
import org.elasticsearch.search.aggregations.AggregatorFactories;
|
|
|
import org.elasticsearch.search.aggregations.InternalAggregation;
|
|
import org.elasticsearch.search.aggregations.InternalAggregation;
|
|
|
import org.elasticsearch.search.aggregations.LeafBucketCollector;
|
|
import org.elasticsearch.search.aggregations.LeafBucketCollector;
|
|
|
import org.elasticsearch.search.aggregations.LeafBucketCollectorBase;
|
|
import org.elasticsearch.search.aggregations.LeafBucketCollectorBase;
|
|
|
import org.elasticsearch.search.aggregations.bucket.BucketsAggregator;
|
|
import org.elasticsearch.search.aggregations.bucket.BucketsAggregator;
|
|
|
|
|
+import org.elasticsearch.search.aggregations.bucket.terms.LongKeyedBucketOrds;
|
|
|
import org.elasticsearch.search.aggregations.support.ValuesSource;
|
|
import org.elasticsearch.search.aggregations.support.ValuesSource;
|
|
|
import org.elasticsearch.search.internal.SearchContext;
|
|
import org.elasticsearch.search.internal.SearchContext;
|
|
|
|
|
|
|
@@ -46,17 +46,16 @@ public abstract class GeoGridAggregator<T extends InternalGeoGrid> extends Bucke
|
|
|
protected final int requiredSize;
|
|
protected final int requiredSize;
|
|
|
protected final int shardSize;
|
|
protected final int shardSize;
|
|
|
protected final ValuesSource.Numeric valuesSource;
|
|
protected final ValuesSource.Numeric valuesSource;
|
|
|
- protected final LongHash bucketOrds;
|
|
|
|
|
- protected SortedNumericDocValues values;
|
|
|
|
|
|
|
+ protected final LongKeyedBucketOrds bucketOrds;
|
|
|
|
|
|
|
|
GeoGridAggregator(String name, AggregatorFactories factories, ValuesSource.Numeric valuesSource,
|
|
GeoGridAggregator(String name, AggregatorFactories factories, ValuesSource.Numeric valuesSource,
|
|
|
int requiredSize, int shardSize, SearchContext aggregationContext,
|
|
int requiredSize, int shardSize, SearchContext aggregationContext,
|
|
|
- Aggregator parent, Map<String, Object> metadata) throws IOException {
|
|
|
|
|
|
|
+ Aggregator parent, boolean collectsFromSingleBucket, Map<String, Object> metadata) throws IOException {
|
|
|
super(name, factories, aggregationContext, parent, metadata);
|
|
super(name, factories, aggregationContext, parent, metadata);
|
|
|
this.valuesSource = valuesSource;
|
|
this.valuesSource = valuesSource;
|
|
|
this.requiredSize = requiredSize;
|
|
this.requiredSize = requiredSize;
|
|
|
this.shardSize = shardSize;
|
|
this.shardSize = shardSize;
|
|
|
- bucketOrds = new LongHash(1, aggregationContext.bigArrays());
|
|
|
|
|
|
|
+ bucketOrds = LongKeyedBucketOrds.build(context.bigArrays(), collectsFromSingleBucket);
|
|
|
}
|
|
}
|
|
|
|
|
|
|
|
@Override
|
|
@Override
|
|
@@ -70,11 +69,10 @@ public abstract class GeoGridAggregator<T extends InternalGeoGrid> extends Bucke
|
|
|
@Override
|
|
@Override
|
|
|
public LeafBucketCollector getLeafCollector(LeafReaderContext ctx,
|
|
public LeafBucketCollector getLeafCollector(LeafReaderContext ctx,
|
|
|
final LeafBucketCollector sub) throws IOException {
|
|
final LeafBucketCollector sub) throws IOException {
|
|
|
- values = valuesSource.longValues(ctx);
|
|
|
|
|
|
|
+ SortedNumericDocValues values = valuesSource.longValues(ctx);
|
|
|
return new LeafBucketCollectorBase(sub, null) {
|
|
return new LeafBucketCollectorBase(sub, null) {
|
|
|
@Override
|
|
@Override
|
|
|
- public void collect(int doc, long bucket) throws IOException {
|
|
|
|
|
- assert bucket == 0;
|
|
|
|
|
|
|
+ public void collect(int doc, long owningBucketOrd) throws IOException {
|
|
|
if (values.advanceExact(doc)) {
|
|
if (values.advanceExact(doc)) {
|
|
|
final int valuesCount = values.docValueCount();
|
|
final int valuesCount = values.docValueCount();
|
|
|
|
|
|
|
@@ -82,7 +80,7 @@ public abstract class GeoGridAggregator<T extends InternalGeoGrid> extends Bucke
|
|
|
for (int i = 0; i < valuesCount; ++i) {
|
|
for (int i = 0; i < valuesCount; ++i) {
|
|
|
final long val = values.nextValue();
|
|
final long val = values.nextValue();
|
|
|
if (previous != val || i == 0) {
|
|
if (previous != val || i == 0) {
|
|
|
- long bucketOrdinal = bucketOrds.add(val);
|
|
|
|
|
|
|
+ long bucketOrdinal = bucketOrds.add(owningBucketOrd, val);
|
|
|
if (bucketOrdinal < 0) { // already seen
|
|
if (bucketOrdinal < 0) { // already seen
|
|
|
bucketOrdinal = -1 - bucketOrdinal;
|
|
bucketOrdinal = -1 - bucketOrdinal;
|
|
|
collectExistingBucket(sub, doc, bucketOrdinal);
|
|
collectExistingBucket(sub, doc, bucketOrdinal);
|
|
@@ -108,31 +106,38 @@ public abstract class GeoGridAggregator<T extends InternalGeoGrid> extends Bucke
|
|
|
|
|
|
|
|
@Override
|
|
@Override
|
|
|
public InternalAggregation[] buildAggregations(long[] owningBucketOrds) throws IOException {
|
|
public InternalAggregation[] buildAggregations(long[] owningBucketOrds) throws IOException {
|
|
|
- assert owningBucketOrds.length == 1 && owningBucketOrds[0] == 0;
|
|
|
|
|
- final int size = (int) Math.min(bucketOrds.size(), shardSize);
|
|
|
|
|
- consumeBucketsAndMaybeBreak(size);
|
|
|
|
|
-
|
|
|
|
|
- BucketPriorityQueue<InternalGeoGridBucket> ordered = new BucketPriorityQueue<>(size);
|
|
|
|
|
- InternalGeoGridBucket spare = null;
|
|
|
|
|
- for (long i = 0; i < bucketOrds.size(); i++) {
|
|
|
|
|
- if (spare == null) {
|
|
|
|
|
- spare = newEmptyBucket();
|
|
|
|
|
|
|
+ InternalGeoGridBucket[][] topBucketsPerOrd = new InternalGeoGridBucket[owningBucketOrds.length][];
|
|
|
|
|
+ for (int ordIdx = 0; ordIdx < owningBucketOrds.length; ordIdx++) {
|
|
|
|
|
+ int size = (int) Math.min(bucketOrds.bucketsInOrd(owningBucketOrds[ordIdx]), shardSize);
|
|
|
|
|
+ consumeBucketsAndMaybeBreak(size);
|
|
|
|
|
+
|
|
|
|
|
+ BucketPriorityQueue<InternalGeoGridBucket> ordered = new BucketPriorityQueue<>(size);
|
|
|
|
|
+ InternalGeoGridBucket spare = null;
|
|
|
|
|
+ LongKeyedBucketOrds.BucketOrdsEnum ordsEnum = bucketOrds.ordsEnum(owningBucketOrds[ordIdx]);
|
|
|
|
|
+ while (ordsEnum.next()) {
|
|
|
|
|
+ if (spare == null) {
|
|
|
|
|
+ spare = newEmptyBucket();
|
|
|
|
|
+ }
|
|
|
|
|
+
|
|
|
|
|
+ // need a special function to keep the source bucket
|
|
|
|
|
+ // up-to-date so it can get the appropriate key
|
|
|
|
|
+ spare.hashAsLong = ordsEnum.value();
|
|
|
|
|
+ spare.docCount = bucketDocCount(ordsEnum.ord());
|
|
|
|
|
+ spare.bucketOrd = ordsEnum.ord();
|
|
|
|
|
+ spare = ordered.insertWithOverflow(spare);
|
|
|
|
|
+ }
|
|
|
|
|
+
|
|
|
|
|
+ topBucketsPerOrd[ordIdx] = new InternalGeoGridBucket[ordered.size()];
|
|
|
|
|
+ for (int i = ordered.size() - 1; i >= 0; --i) {
|
|
|
|
|
+ topBucketsPerOrd[ordIdx][i] = ordered.pop();
|
|
|
}
|
|
}
|
|
|
-
|
|
|
|
|
- // need a special function to keep the source bucket
|
|
|
|
|
- // up-to-date so it can get the appropriate key
|
|
|
|
|
- spare.hashAsLong = bucketOrds.get(i);
|
|
|
|
|
- spare.docCount = bucketDocCount(i);
|
|
|
|
|
- spare.bucketOrd = i;
|
|
|
|
|
- spare = ordered.insertWithOverflow(spare);
|
|
|
|
|
}
|
|
}
|
|
|
-
|
|
|
|
|
- final InternalGeoGridBucket[] list = new InternalGeoGridBucket[ordered.size()];
|
|
|
|
|
- for (int i = ordered.size() - 1; i >= 0; --i) {
|
|
|
|
|
- list[i] = ordered.pop();
|
|
|
|
|
|
|
+ buildSubAggsForAllBuckets(topBucketsPerOrd, b -> b.bucketOrd, (b, aggs) -> b.aggregations = aggs);
|
|
|
|
|
+ InternalAggregation[] results = new InternalAggregation[owningBucketOrds.length];
|
|
|
|
|
+ for (int ordIdx = 0; ordIdx < owningBucketOrds.length; ordIdx++) {
|
|
|
|
|
+ results[ordIdx] = buildAggregation(name, requiredSize, Arrays.asList(topBucketsPerOrd[ordIdx]), metadata());
|
|
|
}
|
|
}
|
|
|
- buildSubAggsForBuckets(list, b -> b.bucketOrd, (b, aggs) -> b.aggregations = aggs);
|
|
|
|
|
- return new InternalAggregation[] {buildAggregation(name, requiredSize, Arrays.asList(list), metadata())};
|
|
|
|
|
|
|
+ return results;
|
|
|
}
|
|
}
|
|
|
|
|
|
|
|
@Override
|
|
@Override
|