Bläddra i källkod

Add a TSID global ordinal to TimeSeriesIndexSearcher (#90035)

Rather than trying to compare BytesRefs in tsdb-related aggregations, it
will be much quicker if we can use a search-global ordinal to detect when
we have moved to a new TSID. This commit adds such an ordinal to the
aggregation execution context.
Alan Woodward 3 år sedan
förälder
incheckning
aed64a6c76

+ 5 - 0
docs/changelog/90035.yaml

@@ -0,0 +1,5 @@
+pr: 90035
+summary: Add a TSID global ordinal to `TimeSeriesIndexSearcher`
+area: TSDB
+type: feature
+issues: []

+ 3 - 1
modules/parent-join/src/main/java/org/elasticsearch/join/aggregations/ParentJoinAggregator.java

@@ -122,7 +122,9 @@ public abstract class ParentJoinAggregator extends BucketsAggregator implements
                 continue;
             }
             DocIdSetIterator childDocsIter = childDocsScorer.iterator();
-            final LeafBucketCollector sub = collectableSubAggregators.getLeafCollector(new AggregationExecutionContext(ctx, null, null));
+            final LeafBucketCollector sub = collectableSubAggregators.getLeafCollector(
+                new AggregationExecutionContext(ctx, null, null, null)
+            );
 
             final SortedSetDocValues globalOrdinals = valuesSource.globalOrdinalsValues(ctx);
             // Set the scorer, since we now replay only the child docIds

+ 14 - 5
server/src/main/java/org/elasticsearch/search/aggregations/AggregationExecutionContext.java

@@ -11,6 +11,8 @@ package org.elasticsearch.search.aggregations;
 import org.apache.lucene.index.LeafReaderContext;
 import org.apache.lucene.util.BytesRef;
 
+import java.util.function.IntSupplier;
+import java.util.function.LongSupplier;
 import java.util.function.Supplier;
 
 /**
@@ -21,18 +23,21 @@ import java.util.function.Supplier;
  */
 public class AggregationExecutionContext {
 
-    private final Supplier<BytesRef> tsidProvider;
-    private final Supplier<Long> timestampProvider;
+    private final Supplier<BytesRef> tsidProvider;  // TODO remove this entirely?
+    private final LongSupplier timestampProvider;
+    private final IntSupplier tsidOrdProvider;
     private final LeafReaderContext leafReaderContext;
 
     public AggregationExecutionContext(
         LeafReaderContext leafReaderContext,
         Supplier<BytesRef> tsidProvider,
-        Supplier<Long> timestampProvider
+        LongSupplier timestampProvider,
+        IntSupplier tsidOrdProvider
     ) {
         this.leafReaderContext = leafReaderContext;
         this.tsidProvider = tsidProvider;
         this.timestampProvider = timestampProvider;
+        this.tsidOrdProvider = tsidOrdProvider;
     }
 
     public LeafReaderContext getLeafReaderContext() {
@@ -43,7 +48,11 @@ public class AggregationExecutionContext {
         return tsidProvider != null ? tsidProvider.get() : null;
     }
 
-    public Long getTimestamp() {
-        return timestampProvider.get();
+    public long getTimestamp() {
+        return timestampProvider.getAsLong();
+    }
+
+    public int getTsidOrd() {
+        return tsidOrdProvider.getAsInt();
     }
 }

+ 1 - 1
server/src/main/java/org/elasticsearch/search/aggregations/BucketCollector.java

@@ -83,7 +83,7 @@ public abstract class BucketCollector {
 
         @Override
         public LeafCollector getLeafCollector(LeafReaderContext context) throws IOException {
-            return bucketCollector.getLeafCollector(new AggregationExecutionContext(context, null, null));
+            return bucketCollector.getLeafCollector(new AggregationExecutionContext(context, null, null, null));
         }
 
         @Override

+ 1 - 1
server/src/main/java/org/elasticsearch/search/aggregations/timeseries/TimeSeriesAggregator.java

@@ -42,7 +42,7 @@ public class TimeSeriesAggregator extends BucketsAggregator {
         CardinalityUpperBound bucketCardinality,
         Map<String, Object> metadata
     ) throws IOException {
-        super(name, factories, context, parent, bucketCardinality, metadata);
+        super(name, factories, context, parent, CardinalityUpperBound.MANY, metadata);
         this.keyed = keyed;
         bucketOrds = BytesKeyedBucketOrds.build(bigArrays(), bucketCardinality);
     }

+ 8 - 4
server/src/main/java/org/elasticsearch/search/aggregations/timeseries/TimeSeriesIndexSearcher.java

@@ -33,6 +33,7 @@ import java.io.IOException;
 import java.util.ArrayList;
 import java.util.Iterator;
 import java.util.List;
+import java.util.function.IntSupplier;
 
 import static org.elasticsearch.index.IndexSortConfig.TIME_SERIES_SORT;
 
@@ -65,6 +66,7 @@ public class TimeSeriesIndexSearcher {
         int seen = 0;
         query = searcher.rewrite(query);
         Weight weight = searcher.createWeight(query, bucketCollector.scoreMode(), 1);
+        int[] tsidOrd = new int[1];
 
         // Create LeafWalker for each subreader
         List<LeafWalker> leafWalkers = new ArrayList<>();
@@ -74,7 +76,7 @@ public class TimeSeriesIndexSearcher {
             }
             Scorer scorer = weight.scorer(leaf);
             if (scorer != null) {
-                LeafWalker leafWalker = new LeafWalker(leaf, scorer, bucketCollector, leaf);
+                LeafWalker leafWalker = new LeafWalker(leaf, scorer, bucketCollector, () -> tsidOrd[0]);
                 if (leafWalker.nextDoc() != DocIdSetIterator.NO_MORE_DOCS) {
                     leafWalkers.add(leafWalker);
                 }
@@ -83,7 +85,7 @@ public class TimeSeriesIndexSearcher {
                 // this is needed to trigger actions in some bucketCollectors that bypass the normal iteration logic
                 // for example, global aggregator triggers a separate iterator that ignores the query but still needs
                 // to know all leaves
-                bucketCollector.getLeafCollector(new AggregationExecutionContext(leaf, null, null));
+                bucketCollector.getLeafCollector(new AggregationExecutionContext(leaf, null, null, null));
             }
         }
 
@@ -115,6 +117,7 @@ public class TimeSeriesIndexSearcher {
                     queue.updateTop();
                 }
             } while (queue.size() > 0);
+            tsidOrd[0]++;
         }
     }
 
@@ -178,8 +181,9 @@ public class TimeSeriesIndexSearcher {
         int tsidOrd;
         long timestamp;
 
-        LeafWalker(LeafReaderContext context, Scorer scorer, BucketCollector bucketCollector, LeafReaderContext leaf) throws IOException {
-            AggregationExecutionContext aggCtx = new AggregationExecutionContext(leaf, scratch::get, () -> timestamp);
+        LeafWalker(LeafReaderContext context, Scorer scorer, BucketCollector bucketCollector, IntSupplier tsidOrdSupplier)
+            throws IOException {
+            AggregationExecutionContext aggCtx = new AggregationExecutionContext(context, scratch::get, () -> timestamp, tsidOrdSupplier);
             this.collector = bucketCollector.getLeafCollector(aggCtx);
             liveDocs = context.reader().getLiveDocs();
             this.collector.setScorer(scorer);

+ 1 - 1
server/src/test/java/org/elasticsearch/search/aggregations/MultiBucketCollectorTests.java

@@ -213,7 +213,7 @@ public class MultiBucketCollectorTests extends ESTestCase {
                         for (Map.Entry<TotalHitCountBucketCollector, Integer> expectedCount : expectedCounts.entrySet()) {
                             shouldNoop &= expectedCount.getValue().intValue() <= expectedCount.getKey().getTotalHits();
                         }
-                        LeafBucketCollector collector = wrapped.getLeafCollector(new AggregationExecutionContext(ctx, null, null));
+                        LeafBucketCollector collector = wrapped.getLeafCollector(new AggregationExecutionContext(ctx, null, null, null));
                         assertThat(collector.isNoop(), equalTo(shouldNoop));
                         if (false == collector.isNoop()) {
                             for (int docId = 0; docId < ctx.reader().numDocs(); docId++) {

+ 1 - 1
server/src/test/java/org/elasticsearch/search/aggregations/bucket/BestBucketsDeferringCollectorTests.java

@@ -210,7 +210,7 @@ public class BestBucketsDeferringCollectorTests extends AggregatorTestCase {
                     @Override
                     public LeafBucketCollector getLeafCollector(LeafReaderContext context) throws IOException {
                         LeafBucketCollector delegate = deferringCollector.getLeafCollector(
-                            new AggregationExecutionContext(context, null, null)
+                            new AggregationExecutionContext(context, null, null, null)
                         );
                         return leafCollector.apply(deferringCollector, delegate);
                     }

+ 1 - 1
server/src/test/java/org/elasticsearch/search/aggregations/bucket/filter/FilterAggregatorTests.java

@@ -111,7 +111,7 @@ public class FilterAggregatorTests extends AggregatorTestCase {
                 FilterAggregator agg = createAggregator(builder, indexSearcher, fieldType);
                 agg.preCollection();
                 LeafBucketCollector collector = agg.getLeafCollector(
-                    new AggregationExecutionContext(indexReader.leaves().get(0), null, null)
+                    new AggregationExecutionContext(indexReader.leaves().get(0), null, null, null)
                 );
                 collector.collect(0, 0);
                 collector.collect(0, 0);

+ 1 - 1
server/src/test/java/org/elasticsearch/search/aggregations/bucket/filter/FiltersAggregatorTests.java

@@ -1633,7 +1633,7 @@ public class FiltersAggregatorTests extends AggregatorTestCase {
     private Map<String, Object> collectAndGetFilterDebugInfo(IndexSearcher searcher, Aggregator aggregator) throws IOException {
         aggregator.preCollection();
         for (LeafReaderContext ctx : searcher.getIndexReader().leaves()) {
-            LeafBucketCollector leafCollector = aggregator.getLeafCollector(new AggregationExecutionContext(ctx, null, null));
+            LeafBucketCollector leafCollector = aggregator.getLeafCollector(new AggregationExecutionContext(ctx, null, null, null));
             assertTrue(leafCollector.isNoop());
         }
         Map<String, Object> debug = new HashMap<>();

+ 11 - 5
server/src/test/java/org/elasticsearch/search/aggregations/timeseries/TimeSeriesIndexSearcherTests.java

@@ -43,6 +43,7 @@ import java.util.concurrent.TimeUnit;
 import java.util.concurrent.atomic.AtomicInteger;
 
 import static org.elasticsearch.index.IndexSortConfig.TIME_SERIES_SORT;
+import static org.hamcrest.Matchers.greaterThan;
 
 public class TimeSeriesIndexSearcherTests extends ESTestCase {
 
@@ -175,9 +176,10 @@ public class TimeSeriesIndexSearcherTests extends ESTestCase {
     private BucketCollector getBucketCollector(long totalCount) {
         return new BucketCollector() {
 
-            boolean tsidReverse = TIME_SERIES_SORT[0].getOrder() == SortOrder.DESC;
-            boolean timestampReverse = TIME_SERIES_SORT[1].getOrder() == SortOrder.DESC;
+            final boolean tsidReverse = TIME_SERIES_SORT[0].getOrder() == SortOrder.DESC;
+            final boolean timestampReverse = TIME_SERIES_SORT[1].getOrder() == SortOrder.DESC;
             BytesRef currentTSID = null;
+            int currentTSIDord = -1;
             long currentTimestamp = 0;
             long total = 0;
 
@@ -197,7 +199,7 @@ public class TimeSeriesIndexSearcherTests extends ESTestCase {
                         BytesRef latestTSID = tsid.lookupOrd(tsid.ordValue());
                         long latestTimestamp = timestamp.longValue();
                         assertEquals(latestTSID, aggCtx.getTsid());
-                        assertEquals(latestTimestamp, aggCtx.getTimestamp().longValue());
+                        assertEquals(latestTimestamp, aggCtx.getTimestamp());
 
                         if (currentTSID != null) {
                             assertTrue(
@@ -209,22 +211,26 @@ public class TimeSeriesIndexSearcherTests extends ESTestCase {
                                     currentTimestamp + "->" + latestTimestamp,
                                     timestampReverse ? latestTimestamp <= currentTimestamp : latestTimestamp >= currentTimestamp
                                 );
+                                assertEquals(currentTSIDord, aggCtx.getTsidOrd());
+                            } else {
+                                assertThat(aggCtx.getTsidOrd(), greaterThan(currentTSIDord));
                             }
                         }
                         currentTimestamp = latestTimestamp;
                         currentTSID = BytesRef.deepCopyOf(latestTSID);
+                        currentTSIDord = aggCtx.getTsidOrd();
                         total++;
                     }
                 };
             }
 
             @Override
-            public void preCollection() throws IOException {
+            public void preCollection() {
 
             }
 
             @Override
-            public void postCollection() throws IOException {
+            public void postCollection() {
                 assertEquals(totalCount, total);
             }
 

+ 1 - 1
x-pack/plugin/analytics/src/test/java/org/elasticsearch/xpack/analytics/topmetrics/TopMetricsAggregatorTests.java

@@ -357,7 +357,7 @@ public class TopMetricsAggregatorTests extends AggregatorTestCase {
                 aggregator.preCollection();
                 assertThat(indexReader.leaves(), hasSize(1));
                 LeafBucketCollector leaf = aggregator.getLeafCollector(
-                    new AggregationExecutionContext(indexReader.leaves().get(0), null, null)
+                    new AggregationExecutionContext(indexReader.leaves().get(0), null, null, null)
                 );
 
                 /*