瀏覽代碼

ESQL: Compute engine support for tagged queries (#128521)

Begins adding support for running "tagged queries" to the compute
engine. Here, it's just the `LuceneSourceOperator` because that's
useful and contained.

Example time! Say you are running:
```
FROM foo
| STATS MAX(v) BY ROUND_TO(g, 0, 100, 1000, 100000)
```

It's *often* faster to run this as four queries:
* The docs that round to `0`
* The docs that round to `100`
* The docs that round to `1000`
* The docs that round to `100000`

This creates an ESQL operator that can run these queries, one after the
other and attach those tags.

Aggs uses this trick and it's *way* faster when it can push down count
queries, but it's still faster when it pushes doc loading things. This
implementation in `LuceneSourceOperator` is quite similar to the doc
loading version in _search.

I don't have performance measurements yet because I haven't plugged this
into the language. In _search we call this `filter-by-filter` and enable
it when each group averages to more than 5000 documents and when there
isn't an `_doc_count` field. It's faster in those cases not to push. I
expect we'll be pretty similar.
Nik Everett 4 月之前
父節點
當前提交
1b151eda4b
共有 27 個文件被更改,包括 431 次插入118 次删除
  1. 3 2
      docs/reference/query-languages/esql/_snippets/functions/appendix/values.md
  2. 1 1
      docs/reference/query-languages/esql/kibana/definition/functions/values.json
  3. 2 1
      docs/reference/query-languages/esql/kibana/docs/functions/values.md
  4. 4 2
      x-pack/plugin/esql/compute/src/main/java/org/elasticsearch/compute/lucene/LuceneCountOperator.java
  5. 1 2
      x-pack/plugin/esql/compute/src/main/java/org/elasticsearch/compute/lucene/LuceneMaxFactory.java
  6. 1 2
      x-pack/plugin/esql/compute/src/main/java/org/elasticsearch/compute/lucene/LuceneMinFactory.java
  7. 3 0
      x-pack/plugin/esql/compute/src/main/java/org/elasticsearch/compute/lucene/LuceneMinMaxOperator.java
  8. 16 4
      x-pack/plugin/esql/compute/src/main/java/org/elasticsearch/compute/lucene/LuceneOperator.java
  9. 1 1
      x-pack/plugin/esql/compute/src/main/java/org/elasticsearch/compute/lucene/LuceneSlice.java
  10. 63 22
      x-pack/plugin/esql/compute/src/main/java/org/elasticsearch/compute/lucene/LuceneSliceQueue.java
  11. 13 11
      x-pack/plugin/esql/compute/src/main/java/org/elasticsearch/compute/lucene/LuceneSourceOperator.java
  12. 4 1
      x-pack/plugin/esql/compute/src/main/java/org/elasticsearch/compute/lucene/LuceneTopNSourceOperator.java
  13. 3 0
      x-pack/plugin/esql/compute/src/main/java/org/elasticsearch/compute/lucene/TimeSeriesSourceOperator.java
  14. 2 3
      x-pack/plugin/esql/compute/src/main/java/org/elasticsearch/compute/lucene/TimeSeriesSourceOperatorFactory.java
  15. 118 29
      x-pack/plugin/esql/compute/src/test/java/org/elasticsearch/compute/OperatorTests.java
  16. 7 1
      x-pack/plugin/esql/compute/src/test/java/org/elasticsearch/compute/lucene/LuceneCountOperatorTests.java
  17. 9 1
      x-pack/plugin/esql/compute/src/test/java/org/elasticsearch/compute/lucene/LuceneMaxOperatorTestCase.java
  18. 9 1
      x-pack/plugin/esql/compute/src/test/java/org/elasticsearch/compute/lucene/LuceneMinOperatorTestCase.java
  19. 1 1
      x-pack/plugin/esql/compute/src/test/java/org/elasticsearch/compute/lucene/LuceneQueryEvaluatorTests.java
  20. 147 18
      x-pack/plugin/esql/compute/src/test/java/org/elasticsearch/compute/lucene/LuceneSourceOperatorTests.java
  21. 3 2
      x-pack/plugin/esql/compute/src/test/java/org/elasticsearch/compute/lucene/LuceneTopNSourceOperatorScoringTests.java
  22. 3 2
      x-pack/plugin/esql/compute/src/test/java/org/elasticsearch/compute/lucene/LuceneTopNSourceOperatorTests.java
  23. 4 2
      x-pack/plugin/esql/compute/src/test/java/org/elasticsearch/compute/lucene/TimeSeriesSourceOperatorTests.java
  24. 3 3
      x-pack/plugin/esql/compute/src/test/java/org/elasticsearch/compute/lucene/ValueSourceReaderTypeConversionTests.java
  25. 3 3
      x-pack/plugin/esql/compute/src/test/java/org/elasticsearch/compute/lucene/ValuesSourceReaderOperatorTests.java
  26. 2 1
      x-pack/plugin/esql/src/internalClusterTest/java/org/elasticsearch/xpack/esql/action/LookupFromIndexIT.java
  27. 5 2
      x-pack/plugin/esql/src/main/java/org/elasticsearch/xpack/esql/planner/EsPhysicalOperationProviders.java

+ 3 - 2
docs/reference/query-languages/esql/_snippets/functions/appendix/values.md

@@ -1,7 +1,8 @@
 % This is generated by ESQL's AbstractFunctionTestCase. Do no edit it. See ../README.md for how to regenerate it.
 
-::::{note}
-Use [`TOP`](/reference/query-languages/esql/functions-operators/aggregation-functions.md#esql-top) if you need to keep repeated values.
+::::{tip}
+Use [`TOP`](/reference/query-languages/esql/functions-operators/aggregation-functions.md#esql-top)
+if you need to keep repeated values.
 ::::
 ::::{warning}
 This can use a significant amount of memory and ES|QL doesn’t yet

+ 1 - 1
docs/reference/query-languages/esql/kibana/definition/functions/values.json

@@ -2,7 +2,7 @@
   "comment" : "This is generated by ESQL’s AbstractFunctionTestCase. Do no edit it. See ../README.md for how to regenerate it.",
   "type" : "agg",
   "name" : "values",
-  "description" : "Returns unique values as a multivalued field. The order of the returned values isn’t guaranteed.\nIf you need the values returned in order use `MV_SORT`.",
+  "description" : "Returns unique values as a multivalued field. The order of the returned values isn’t guaranteed.\nIf you need the values returned in order use\n`MV_SORT`.",
   "signatures" : [
     {
       "params" : [

+ 2 - 1
docs/reference/query-languages/esql/kibana/docs/functions/values.md

@@ -2,7 +2,8 @@
 
 ### VALUES
 Returns unique values as a multivalued field. The order of the returned values isn’t guaranteed.
-If you need the values returned in order use [`MV_SORT`](https://www.elastic.co/docs/reference/query-languages/esql/functions-operators/mv-functions#esql-mv_sort).
+If you need the values returned in order use
+[`MV_SORT`](https://www.elastic.co/docs/reference/query-languages/esql/functions-operators/mv-functions#esql-mv_sort).
 
 ```esql
 FROM employees

+ 4 - 2
x-pack/plugin/esql/compute/src/main/java/org/elasticsearch/compute/lucene/LuceneCountOperator.java

@@ -9,7 +9,6 @@ package org.elasticsearch.compute.lucene;
 
 import org.apache.lucene.search.DocIdStream;
 import org.apache.lucene.search.LeafCollector;
-import org.apache.lucene.search.Query;
 import org.apache.lucene.search.Scorable;
 import org.apache.lucene.search.ScoreMode;
 import org.apache.lucene.search.Weight;
@@ -44,7 +43,7 @@ public class LuceneCountOperator extends LuceneOperator {
 
         public Factory(
             List<? extends ShardContext> contexts,
-            Function<ShardContext, Query> queryFunction,
+            Function<ShardContext, List<LuceneSliceQueue.QueryAndTags>> queryFunction,
             DataPartitioning dataPartitioning,
             int taskConcurrency,
             int limit
@@ -121,6 +120,9 @@ public class LuceneCountOperator extends LuceneOperator {
             if (scorer == null) {
                 remainingDocs = 0;
             } else {
+                if (scorer.tags().isEmpty() == false) {
+                    throw new UnsupportedOperationException("tags not supported by " + getClass());
+                }
                 Weight weight = scorer.weight();
                 var leafReaderContext = scorer.leafReaderContext();
                 // see org.apache.lucene.search.TotalHitCountCollector

+ 1 - 2
x-pack/plugin/esql/compute/src/main/java/org/elasticsearch/compute/lucene/LuceneMaxFactory.java

@@ -10,7 +10,6 @@ package org.elasticsearch.compute.lucene;
 import org.apache.lucene.index.NumericDocValues;
 import org.apache.lucene.index.PointValues;
 import org.apache.lucene.index.SortedNumericDocValues;
-import org.apache.lucene.search.Query;
 import org.apache.lucene.search.ScoreMode;
 import org.apache.lucene.util.NumericUtils;
 import org.elasticsearch.compute.data.Block;
@@ -114,7 +113,7 @@ public final class LuceneMaxFactory extends LuceneOperator.Factory {
 
     public LuceneMaxFactory(
         List<? extends ShardContext> contexts,
-        Function<ShardContext, Query> queryFunction,
+        Function<ShardContext, List<LuceneSliceQueue.QueryAndTags>> queryFunction,
         DataPartitioning dataPartitioning,
         int taskConcurrency,
         String fieldName,

+ 1 - 2
x-pack/plugin/esql/compute/src/main/java/org/elasticsearch/compute/lucene/LuceneMinFactory.java

@@ -10,7 +10,6 @@ package org.elasticsearch.compute.lucene;
 import org.apache.lucene.index.NumericDocValues;
 import org.apache.lucene.index.PointValues;
 import org.apache.lucene.index.SortedNumericDocValues;
-import org.apache.lucene.search.Query;
 import org.apache.lucene.search.ScoreMode;
 import org.apache.lucene.util.NumericUtils;
 import org.elasticsearch.compute.data.Block;
@@ -114,7 +113,7 @@ public final class LuceneMinFactory extends LuceneOperator.Factory {
 
     public LuceneMinFactory(
         List<? extends ShardContext> contexts,
-        Function<ShardContext, Query> queryFunction,
+        Function<ShardContext, List<LuceneSliceQueue.QueryAndTags>> queryFunction,
         DataPartitioning dataPartitioning,
         int taskConcurrency,
         String fieldName,

+ 3 - 0
x-pack/plugin/esql/compute/src/main/java/org/elasticsearch/compute/lucene/LuceneMinMaxOperator.java

@@ -102,6 +102,9 @@ final class LuceneMinMaxOperator extends LuceneOperator {
             if (scorer == null) {
                 remainingDocs = 0;
             } else {
+                if (scorer.tags().isEmpty() == false) {
+                    throw new UnsupportedOperationException("tags not supported by " + getClass());
+                }
                 final LeafReader reader = scorer.leafReaderContext().reader();
                 final Query query = scorer.weight().getQuery();
                 if (query == null || query instanceof MatchAllDocsQuery) {

+ 16 - 4
x-pack/plugin/esql/compute/src/main/java/org/elasticsearch/compute/lucene/LuceneOperator.java

@@ -97,7 +97,7 @@ public abstract class LuceneOperator extends SourceOperator {
          */
         protected Factory(
             List<? extends ShardContext> contexts,
-            Function<ShardContext, Query> queryFunction,
+            Function<ShardContext, List<LuceneSliceQueue.QueryAndTags>> queryFunction,
             DataPartitioning dataPartitioning,
             Function<Query, LuceneSliceQueue.PartitioningStrategy> autoStrategy,
             int taskConcurrency,
@@ -155,10 +155,13 @@ public abstract class LuceneOperator extends SourceOperator {
             final PartialLeafReaderContext partialLeaf = currentSlice.getLeaf(sliceIndex++);
             logger.trace("Starting {}", partialLeaf);
             final LeafReaderContext leaf = partialLeaf.leafReaderContext();
-            if (currentScorer == null || currentScorer.leafReaderContext() != leaf) {
+            if (currentScorer == null // First time
+                || currentScorer.leafReaderContext() != leaf // Moved to a new leaf
+                || currentScorer.weight != currentSlice.weight() // Moved to a new query
+            ) {
                 final Weight weight = currentSlice.weight();
                 processedQueries.add(weight.getQuery());
-                currentScorer = new LuceneScorer(currentSlice.shardContext(), weight, leaf);
+                currentScorer = new LuceneScorer(currentSlice.shardContext(), weight, currentSlice.tags(), leaf);
             }
             assert currentScorer.maxPosition <= partialLeaf.maxDoc() : currentScorer.maxPosition + ">" + partialLeaf.maxDoc();
             currentScorer.maxPosition = partialLeaf.maxDoc();
@@ -177,15 +180,17 @@ public abstract class LuceneOperator extends SourceOperator {
         private final ShardContext shardContext;
         private final Weight weight;
         private final LeafReaderContext leafReaderContext;
+        private final List<Object> tags;
 
         private BulkScorer bulkScorer;
         private int position;
         private int maxPosition;
         private Thread executingThread;
 
-        LuceneScorer(ShardContext shardContext, Weight weight, LeafReaderContext leafReaderContext) {
+        LuceneScorer(ShardContext shardContext, Weight weight, List<Object> tags, LeafReaderContext leafReaderContext) {
             this.shardContext = shardContext;
             this.weight = weight;
+            this.tags = tags;
             this.leafReaderContext = leafReaderContext;
             reinitialize();
         }
@@ -230,6 +235,13 @@ public abstract class LuceneOperator extends SourceOperator {
         int position() {
             return position;
         }
+
+        /**
+         * Tags to add to the data returned by this query.
+         */
+        List<Object> tags() {
+            return tags;
+        }
     }
 
     @Override

+ 1 - 1
x-pack/plugin/esql/compute/src/main/java/org/elasticsearch/compute/lucene/LuceneSlice.java

@@ -14,7 +14,7 @@ import java.util.List;
 /**
  * Holds a list of multiple partial Lucene segments
  */
-public record LuceneSlice(ShardContext shardContext, List<PartialLeafReaderContext> leaves, Weight weight) {
+public record LuceneSlice(ShardContext shardContext, List<PartialLeafReaderContext> leaves, Weight weight, List<Object> tags) {
     int numLeaves() {
         return leaves.size();
     }

+ 63 - 22
x-pack/plugin/esql/compute/src/main/java/org/elasticsearch/compute/lucene/LuceneSliceQueue.java

@@ -32,8 +32,47 @@ import java.util.function.Function;
 
 /**
  * Shared Lucene slices between Lucene operators.
+ * <p>
+ *     Each shard is {@link #create built} with a list of queries to run and
+ *     tags to add to the queries ({@code List<QueryAndTags>}). Some examples:
+ * </p>
+ * <ul>
+ *     <li>
+ *         For queries like {@code FROM foo} we'll use a one element list
+ *         containing {@code match_all, []}. It loads all documents in the
+ *         index and append no extra fields to the loaded documents.
+ *     </li>
+ *     <li>
+ *         For queries like {@code FROM foo | WHERE a > 10} we'll use a one
+ *         element list containing {@code +single_value(a) +(a > 10), []}.
+ *         It loads all documents where {@code a} is single valued and
+ *         greater than 10.
+ *     </li>
+ *     <li>
+ *         For queries like {@code FROM foo | STATS MAX(b) BY ROUND_TO(a, 0, 100)}
+ *         we'll use a two element list containing
+ *         <ul>
+ *             <li>{@code +single_value(a) +(a < 100), [0]}</li>
+ *             <li>{@code +single_value(a) +(a >= 100), [100]}</li>
+ *         </ul>
+ *         It loads all documents in the index where {@code a} is single
+ *         valued and adds a constant {@code 0} to the documents where
+ *         {@code a < 100} and the constant {@code 100} to the documents
+ *         where {@code a >= 100}.
+ *     </li>
+ * </ul>
+ * <p>
+ *     IMPORTANT: Runners make no effort to deduplicate the results from multiple
+ *     queries. If you need to only see each document one time then make sure the
+ *     queries are mutually exclusive.
+ * </p>
  */
 public final class LuceneSliceQueue {
+    /**
+     * Query to run and tags to add to the results.
+     */
+    public record QueryAndTags(Query query, List<Object> tags) {}
+
     public static final int MAX_DOCS_PER_SLICE = 250_000; // copied from IndexSearcher
     public static final int MAX_SEGMENTS_PER_SLICE = 5; // copied from IndexSearcher
 
@@ -69,7 +108,7 @@ public final class LuceneSliceQueue {
 
     public static LuceneSliceQueue create(
         List<? extends ShardContext> contexts,
-        Function<ShardContext, Query> queryFunction,
+        Function<ShardContext, List<QueryAndTags>> queryFunction,
         DataPartitioning dataPartitioning,
         Function<Query, PartitioningStrategy> autoStrategy,
         int taskConcurrency,
@@ -78,27 +117,29 @@ public final class LuceneSliceQueue {
         List<LuceneSlice> slices = new ArrayList<>();
         Map<String, PartitioningStrategy> partitioningStrategies = new HashMap<>(contexts.size());
         for (ShardContext ctx : contexts) {
-            Query query = queryFunction.apply(ctx);
-            query = scoreMode.needsScores() ? query : new ConstantScoreQuery(query);
-            /*
-             * Rewrite the query on the local index so things like fully
-             * overlapping range queries become match all. It's important
-             * to do this before picking the partitioning strategy so we
-             * can pick more aggressive strategies when the query rewrites
-             * into MatchAll.
-             */
-            try {
-                query = ctx.searcher().rewrite(query);
-            } catch (IOException e) {
-                throw new UncheckedIOException(e);
-            }
-            PartitioningStrategy partitioning = PartitioningStrategy.pick(dataPartitioning, autoStrategy, ctx, query);
-            partitioningStrategies.put(ctx.shardIdentifier(), partitioning);
-            List<List<PartialLeafReaderContext>> groups = partitioning.groups(ctx.searcher(), taskConcurrency);
-            Weight weight = weight(ctx, query, scoreMode);
-            for (List<PartialLeafReaderContext> group : groups) {
-                if (group.isEmpty() == false) {
-                    slices.add(new LuceneSlice(ctx, group, weight));
+            for (QueryAndTags queryAndExtra : queryFunction.apply(ctx)) {
+                Query query = queryAndExtra.query;
+                query = scoreMode.needsScores() ? query : new ConstantScoreQuery(query);
+                /*
+                 * Rewrite the query on the local index so things like fully
+                 * overlapping range queries become match all. It's important
+                 * to do this before picking the partitioning strategy so we
+                 * can pick more aggressive strategies when the query rewrites
+                 * into MatchAll.
+                 */
+                try {
+                    query = ctx.searcher().rewrite(query);
+                } catch (IOException e) {
+                    throw new UncheckedIOException(e);
+                }
+                PartitioningStrategy partitioning = PartitioningStrategy.pick(dataPartitioning, autoStrategy, ctx, query);
+                partitioningStrategies.put(ctx.shardIdentifier(), partitioning);
+                List<List<PartialLeafReaderContext>> groups = partitioning.groups(ctx.searcher(), taskConcurrency);
+                Weight weight = weight(ctx, query, scoreMode);
+                for (List<PartialLeafReaderContext> group : groups) {
+                    if (group.isEmpty() == false) {
+                        slices.add(new LuceneSlice(ctx, group, weight, queryAndExtra.tags));
+                    }
                 }
             }
         }

+ 13 - 11
x-pack/plugin/esql/compute/src/main/java/org/elasticsearch/compute/lucene/LuceneSourceOperator.java

@@ -17,8 +17,9 @@ import org.apache.lucene.search.MatchAllDocsQuery;
 import org.apache.lucene.search.MatchNoDocsQuery;
 import org.apache.lucene.search.Query;
 import org.apache.lucene.search.Scorable;
+import org.elasticsearch.compute.data.Block;
 import org.elasticsearch.compute.data.BlockFactory;
-import org.elasticsearch.compute.data.DocBlock;
+import org.elasticsearch.compute.data.BlockUtils;
 import org.elasticsearch.compute.data.DocVector;
 import org.elasticsearch.compute.data.DoubleVector;
 import org.elasticsearch.compute.data.IntVector;
@@ -64,7 +65,7 @@ public class LuceneSourceOperator extends LuceneOperator {
 
         public Factory(
             List<? extends ShardContext> contexts,
-            Function<ShardContext, Query> queryFunction,
+            Function<ShardContext, List<LuceneSliceQueue.QueryAndTags>> queryFunction,
             DataPartitioning dataPartitioning,
             int taskConcurrency,
             int maxPageSize,
@@ -320,28 +321,29 @@ public class LuceneSourceOperator extends LuceneOperator {
                 IntVector shard = null;
                 IntVector leaf = null;
                 IntVector docs = null;
-                DoubleVector scores = null;
-                DocBlock docBlock = null;
+                Block[] blocks = new Block[1 + (scoreBuilder == null ? 0 : 1) + scorer.tags().size()];
                 currentPagePos -= discardedDocs;
                 try {
                     shard = blockFactory.newConstantIntVector(scorer.shardContext().index(), currentPagePos);
                     leaf = blockFactory.newConstantIntVector(scorer.leafReaderContext().ord, currentPagePos);
                     docs = buildDocsVector(currentPagePos);
                     docsBuilder = blockFactory.newIntVectorBuilder(Math.min(remainingDocs, maxPageSize));
-                    docBlock = new DocVector(shard, leaf, docs, true).asBlock();
+                    int b = 0;
+                    blocks[b++] = new DocVector(shard, leaf, docs, true).asBlock();
                     shard = null;
                     leaf = null;
                     docs = null;
-                    if (scoreBuilder == null) {
-                        page = new Page(currentPagePos, docBlock);
-                    } else {
-                        scores = buildScoresVector(currentPagePos);
+                    if (scoreBuilder != null) {
+                        blocks[b++] = buildScoresVector(currentPagePos).asBlock();
                         scoreBuilder = blockFactory.newDoubleVectorBuilder(Math.min(remainingDocs, maxPageSize));
-                        page = new Page(currentPagePos, docBlock, scores.asBlock());
                     }
+                    for (Object e : scorer.tags()) {
+                        blocks[b++] = BlockUtils.constantBlock(blockFactory, e, currentPagePos);
+                    }
+                    page = new Page(currentPagePos, blocks);
                 } finally {
                     if (page == null) {
-                        Releasables.closeExpectNoException(shard, leaf, docs, docBlock, scores);
+                        Releasables.closeExpectNoException(shard, leaf, docs, Releasables.wrap(blocks));
                     }
                 }
                 currentPagePos = 0;

+ 4 - 1
x-pack/plugin/esql/compute/src/main/java/org/elasticsearch/compute/lucene/LuceneTopNSourceOperator.java

@@ -58,7 +58,7 @@ public final class LuceneTopNSourceOperator extends LuceneOperator {
 
         public Factory(
             List<? extends ShardContext> contexts,
-            Function<ShardContext, Query> queryFunction,
+            Function<ShardContext, List<LuceneSliceQueue.QueryAndTags>> queryFunction,
             DataPartitioning dataPartitioning,
             int taskConcurrency,
             int maxPageSize,
@@ -171,6 +171,9 @@ public final class LuceneTopNSourceOperator extends LuceneOperator {
             return emit(true);
         }
         try {
+            if (scorer.tags().isEmpty() == false) {
+                throw new UnsupportedOperationException("tags not supported by " + getClass());
+            }
             if (perShardCollector == null || perShardCollector.shardContext.index() != scorer.shardContext().index()) {
                 // TODO: share the bottom between shardCollectors
                 perShardCollector = newPerShardCollector(scorer.shardContext(), sorts, needsScore, limit);

+ 3 - 0
x-pack/plugin/esql/compute/src/main/java/org/elasticsearch/compute/lucene/TimeSeriesSourceOperator.java

@@ -116,6 +116,9 @@ public final class TimeSeriesSourceOperator extends LuceneOperator {
                     doneCollecting = true;
                     return null;
                 }
+                if (slice.tags().isEmpty() == false) {
+                    throw new UnsupportedOperationException("tags not supported by " + getClass());
+                }
                 Releasables.close(fieldsReader);
                 fieldsReader = new ShardLevelFieldsReader(blockFactory, slice.shardContext(), fieldsToExtracts);
                 iterator = new SegmentsIterator(slice);

+ 2 - 3
x-pack/plugin/esql/compute/src/main/java/org/elasticsearch/compute/lucene/TimeSeriesSourceOperatorFactory.java

@@ -7,7 +7,6 @@
 
 package org.elasticsearch.compute.lucene;
 
-import org.apache.lucene.search.Query;
 import org.apache.lucene.search.ScoreMode;
 import org.elasticsearch.compute.operator.DriverContext;
 import org.elasticsearch.compute.operator.SourceOperator;
@@ -37,7 +36,7 @@ public class TimeSeriesSourceOperatorFactory extends LuceneOperator.Factory {
         List<? extends ShardContext> contexts,
         boolean emitDocIds,
         List<ValuesSourceReaderOperator.FieldInfo> fieldsToExact,
-        Function<ShardContext, Query> queryFunction,
+        Function<ShardContext, List<LuceneSliceQueue.QueryAndTags>> queryFunction,
         int taskConcurrency,
         int maxPageSize,
         int limit
@@ -74,7 +73,7 @@ public class TimeSeriesSourceOperatorFactory extends LuceneOperator.Factory {
         boolean emitDocIds,
         List<? extends ShardContext> contexts,
         List<ValuesSourceReaderOperator.FieldInfo> fieldsToExact,
-        Function<ShardContext, Query> queryFunction
+        Function<ShardContext, List<LuceneSliceQueue.QueryAndTags>> queryFunction
     ) {
         return new TimeSeriesSourceOperatorFactory(contexts, emitDocIds, fieldsToExact, queryFunction, taskConcurrency, maxPageSize, limit);
     }

+ 118 - 29
x-pack/plugin/esql/compute/src/test/java/org/elasticsearch/compute/OperatorTests.java

@@ -9,6 +9,7 @@ package org.elasticsearch.compute;
 
 import org.apache.lucene.document.Document;
 import org.apache.lucene.document.Field;
+import org.apache.lucene.document.LongField;
 import org.apache.lucene.document.LongPoint;
 import org.apache.lucene.document.SortedSetDocValuesField;
 import org.apache.lucene.index.DirectoryReader;
@@ -43,9 +44,11 @@ import org.elasticsearch.compute.data.DocVector;
 import org.elasticsearch.compute.data.ElementType;
 import org.elasticsearch.compute.data.IntVector;
 import org.elasticsearch.compute.data.LongBlock;
+import org.elasticsearch.compute.data.LongVector;
 import org.elasticsearch.compute.data.Page;
 import org.elasticsearch.compute.lucene.DataPartitioning;
 import org.elasticsearch.compute.lucene.LuceneOperator;
+import org.elasticsearch.compute.lucene.LuceneSliceQueue;
 import org.elasticsearch.compute.lucene.LuceneSourceOperator;
 import org.elasticsearch.compute.lucene.LuceneSourceOperatorTests;
 import org.elasticsearch.compute.lucene.ShardContext;
@@ -54,7 +57,6 @@ import org.elasticsearch.compute.operator.AbstractPageMappingOperator;
 import org.elasticsearch.compute.operator.Driver;
 import org.elasticsearch.compute.operator.DriverContext;
 import org.elasticsearch.compute.operator.HashAggregationOperator;
-import org.elasticsearch.compute.operator.LimitOperator;
 import org.elasticsearch.compute.operator.Operator;
 import org.elasticsearch.compute.operator.OrdinalsGroupingOperator;
 import org.elasticsearch.compute.operator.PageConsumerOperator;
@@ -64,9 +66,11 @@ import org.elasticsearch.compute.test.BlockTestUtils;
 import org.elasticsearch.compute.test.OperatorTestCase;
 import org.elasticsearch.compute.test.SequenceLongBlockSourceOperator;
 import org.elasticsearch.compute.test.TestDriverFactory;
+import org.elasticsearch.compute.test.TestResultPageSinkOperator;
 import org.elasticsearch.core.CheckedConsumer;
 import org.elasticsearch.core.Releasables;
 import org.elasticsearch.index.IndexSettings;
+import org.elasticsearch.index.mapper.BlockDocValuesReader;
 import org.elasticsearch.index.mapper.FieldNamesFieldMapper;
 import org.elasticsearch.index.mapper.KeywordFieldMapper;
 import org.elasticsearch.index.mapper.MappedFieldType;
@@ -75,7 +79,6 @@ import org.elasticsearch.index.mapper.SourceLoader;
 import org.elasticsearch.index.mapper.Uid;
 import org.elasticsearch.indices.breaker.NoneCircuitBreakerService;
 import org.elasticsearch.search.lookup.SearchLookup;
-import org.elasticsearch.test.ESTestCase;
 
 import java.io.IOException;
 import java.util.ArrayList;
@@ -90,9 +93,9 @@ import java.util.TreeMap;
 import static org.elasticsearch.compute.aggregation.AggregatorMode.FINAL;
 import static org.elasticsearch.compute.aggregation.AggregatorMode.INITIAL;
 import static org.elasticsearch.compute.test.OperatorTestCase.randomPageSize;
-import static org.hamcrest.Matchers.contains;
 import static org.hamcrest.Matchers.empty;
 import static org.hamcrest.Matchers.equalTo;
+import static org.hamcrest.Matchers.lessThanOrEqualTo;
 
 /**
  * This venerable test builds {@link Driver}s by hand and runs them together, simulating
@@ -112,7 +115,11 @@ public class OperatorTests extends MapperServiceTestCase {
             final long from = randomBoolean() ? Long.MIN_VALUE : randomLongBetween(0, 10000);
             final long to = randomBoolean() ? Long.MAX_VALUE : randomLongBetween(from, from + 10000);
             final Query query = LongPoint.newRangeQuery("pt", from, to);
-            LuceneOperator.Factory factory = luceneOperatorFactory(reader, query, LuceneOperator.NO_LIMIT);
+            LuceneOperator.Factory factory = luceneOperatorFactory(
+                reader,
+                List.of(new LuceneSliceQueue.QueryAndTags(query, List.of())),
+                LuceneOperator.NO_LIMIT
+            );
             List<Driver> drivers = new ArrayList<>();
             try {
                 Set<Integer> actualDocIds = ConcurrentCollections.newConcurrentSet();
@@ -221,7 +228,11 @@ public class OperatorTests extends MapperServiceTestCase {
                 );
                 Driver driver = TestDriverFactory.create(
                     driverContext,
-                    luceneOperatorFactory(reader, new MatchAllDocsQuery(), LuceneOperator.NO_LIMIT).get(driverContext),
+                    luceneOperatorFactory(
+                        reader,
+                        List.of(new LuceneSliceQueue.QueryAndTags(new MatchAllDocsQuery(), List.of())),
+                        LuceneOperator.NO_LIMIT
+                    ).get(driverContext),
                     operators,
                     new PageConsumerOperator(page -> {
                         BytesRefBlock keys = page.getBlock(0);
@@ -243,31 +254,109 @@ public class OperatorTests extends MapperServiceTestCase {
         assertThat(blockFactory.breaker().getUsed(), equalTo(0L));
     }
 
-    public void testLimitOperator() {
-        var positions = 100;
-        var limit = randomIntBetween(90, 101);
-        var values = randomList(positions, positions, ESTestCase::randomLong);
+    public void testPushRoundToToQuery() throws IOException {
+        long firstGroupMax = randomLong();
+        long secondGroupMax = randomLong();
+        long thirdGroupMax = randomLong();
 
-        var results = new ArrayList<Long>();
-        DriverContext driverContext = driverContext();
-        try (
-            var driver = TestDriverFactory.create(
-                driverContext,
-                new SequenceLongBlockSourceOperator(driverContext.blockFactory(), values, 100),
-                List.of((new LimitOperator.Factory(limit)).get(driverContext)),
-                new PageConsumerOperator(page -> {
-                    LongBlock block = page.getBlock(0);
-                    for (int i = 0; i < page.getPositionCount(); i++) {
-                        results.add(block.getLong(i));
+        CheckedConsumer<DirectoryReader, IOException> verifier = reader -> {
+            Query firstGroupQuery = LongPoint.newRangeQuery("g", Long.MIN_VALUE, 99);
+            Query secondGroupQuery = LongPoint.newRangeQuery("g", 100, 9999);
+            Query thirdGroupQuery = LongPoint.newRangeQuery("g", 10000, Long.MAX_VALUE);
+
+            LuceneSliceQueue.QueryAndTags firstGroupQueryAndTags = new LuceneSliceQueue.QueryAndTags(firstGroupQuery, List.of(0L));
+            LuceneSliceQueue.QueryAndTags secondGroupQueryAndTags = new LuceneSliceQueue.QueryAndTags(secondGroupQuery, List.of(100L));
+            LuceneSliceQueue.QueryAndTags thirdGroupQueryAndTags = new LuceneSliceQueue.QueryAndTags(thirdGroupQuery, List.of(10000L));
+
+            LuceneOperator.Factory factory = luceneOperatorFactory(
+                reader,
+                List.of(firstGroupQueryAndTags, secondGroupQueryAndTags, thirdGroupQueryAndTags),
+                LuceneOperator.NO_LIMIT
+            );
+            ValuesSourceReaderOperator.Factory load = new ValuesSourceReaderOperator.Factory(
+                List.of(
+                    new ValuesSourceReaderOperator.FieldInfo("v", ElementType.LONG, f -> new BlockDocValuesReader.LongsBlockLoader("v"))
+                ),
+                List.of(new ValuesSourceReaderOperator.ShardContext(reader, () -> {
+                    throw new UnsupportedOperationException();
+                }, 0.8)),
+                0
+            );
+            List<Page> pages = new ArrayList<>();
+            DriverContext driverContext = driverContext();
+            try (
+                Driver driver = TestDriverFactory.create(
+                    driverContext,
+                    factory.get(driverContext),
+                    List.of(load.get(driverContext)),
+                    new TestResultPageSinkOperator(pages::add)
+                )
+            ) {
+                OperatorTestCase.runDriver(driver);
+            }
+            assertDriverContext(driverContext);
+
+            boolean sawFirstMax = false;
+            boolean sawSecondMax = false;
+            boolean sawThirdMax = false;
+            for (Page page : pages) {
+                logger.error("ADFA {}", page);
+                LongVector group = page.<LongBlock>getBlock(1).asVector();
+                LongVector value = page.<LongBlock>getBlock(2).asVector();
+                for (int p = 0; p < page.getPositionCount(); p++) {
+                    long g = group.getLong(p);
+                    long v = value.getLong(p);
+                    switch ((int) g) {
+                        case 0 -> {
+                            assertThat(v, lessThanOrEqualTo(firstGroupMax));
+                            sawFirstMax |= v == firstGroupMax;
+                        }
+                        case 100 -> {
+                            assertThat(v, lessThanOrEqualTo(secondGroupMax));
+                            sawSecondMax |= v == secondGroupMax;
+                        }
+                        case 10000 -> {
+                            assertThat(v, lessThanOrEqualTo(thirdGroupMax));
+                            sawThirdMax |= v == thirdGroupMax;
+                        }
+                        default -> throw new IllegalArgumentException("Unknown group [" + g + "]");
                     }
-                })
-            )
-        ) {
-            OperatorTestCase.runDriver(driver);
-        }
+                }
+            }
+            assertTrue(sawFirstMax);
+            assertTrue(sawSecondMax);
+            assertTrue(sawThirdMax);
+        };
 
-        assertThat(results, contains(values.stream().limit(limit).toArray()));
-        assertDriverContext(driverContext);
+        try (Directory dir = newDirectory(); RandomIndexWriter w = new RandomIndexWriter(random(), dir)) {
+            int numDocs = randomIntBetween(0, 10_000);
+            for (int i = 0; i < numDocs; i++) {
+                long g, v;
+                switch (between(0, 2)) {
+                    case 0 -> {
+                        g = randomLongBetween(Long.MIN_VALUE, 99);
+                        v = randomLongBetween(Long.MIN_VALUE, firstGroupMax);
+                    }
+                    case 1 -> {
+                        g = randomLongBetween(100, 9999);
+                        v = randomLongBetween(Long.MIN_VALUE, secondGroupMax);
+                    }
+                    case 2 -> {
+                        g = randomLongBetween(10000, Long.MAX_VALUE);
+                        v = randomLongBetween(Long.MIN_VALUE, thirdGroupMax);
+                    }
+                    default -> throw new IllegalArgumentException();
+                }
+                w.addDocument(List.of(new LongField("g", g, Field.Store.NO), new LongField("v", v, Field.Store.NO)));
+            }
+            w.addDocument(List.of(new LongField("g", 0, Field.Store.NO), new LongField("v", firstGroupMax, Field.Store.NO)));
+            w.addDocument(List.of(new LongField("g", 200, Field.Store.NO), new LongField("v", secondGroupMax, Field.Store.NO)));
+            w.addDocument(List.of(new LongField("g", 20000, Field.Store.NO), new LongField("v", thirdGroupMax, Field.Store.NO)));
+
+            try (DirectoryReader reader = w.getReader()) {
+                verifier.accept(reader);
+            }
+        }
     }
 
     private static Set<Integer> searchForDocIds(IndexReader reader, Query query) throws IOException {
@@ -388,11 +477,11 @@ public class OperatorTests extends MapperServiceTestCase {
         assertThat(driverContext.getSnapshot().releasables(), empty());
     }
 
-    static LuceneOperator.Factory luceneOperatorFactory(IndexReader reader, Query query, int limit) {
+    static LuceneOperator.Factory luceneOperatorFactory(IndexReader reader, List<LuceneSliceQueue.QueryAndTags> queryAndTags, int limit) {
         final ShardContext searchContext = new LuceneSourceOperatorTests.MockShardContext(reader, 0);
         return new LuceneSourceOperator.Factory(
             List.of(searchContext),
-            ctx -> query,
+            ctx -> queryAndTags,
             randomFrom(DataPartitioning.values()),
             randomIntBetween(1, 10),
             randomPageSize(),

+ 7 - 1
x-pack/plugin/esql/compute/src/test/java/org/elasticsearch/compute/lucene/LuceneCountOperatorTests.java

@@ -90,7 +90,13 @@ public class LuceneCountOperatorTests extends AnyOperatorTestCase {
         } else {
             query = LongPoint.newRangeQuery("s", 0, numDocs);
         }
-        return new LuceneCountOperator.Factory(List.of(ctx), c -> query, dataPartitioning, between(1, 8), limit);
+        return new LuceneCountOperator.Factory(
+            List.of(ctx),
+            c -> List.of(new LuceneSliceQueue.QueryAndTags(query, List.of())),
+            dataPartitioning,
+            between(1, 8),
+            limit
+        );
     }
 
     @Override

+ 9 - 1
x-pack/plugin/esql/compute/src/test/java/org/elasticsearch/compute/lucene/LuceneMaxOperatorTestCase.java

@@ -116,7 +116,15 @@ public abstract class LuceneMaxOperatorTestCase extends AnyOperatorTestCase {
         } else {
             query = SortedNumericDocValuesField.newSlowRangeQuery(FIELD_NAME, Long.MIN_VALUE, Long.MAX_VALUE);
         }
-        return new LuceneMaxFactory(List.of(ctx), c -> query, dataPartitioning, between(1, 8), FIELD_NAME, getNumberType(), limit);
+        return new LuceneMaxFactory(
+            List.of(ctx),
+            c -> List.of(new LuceneSliceQueue.QueryAndTags(query, List.of())),
+            dataPartitioning,
+            between(1, 8),
+            FIELD_NAME,
+            getNumberType(),
+            limit
+        );
     }
 
     public void testSimple() {

+ 9 - 1
x-pack/plugin/esql/compute/src/test/java/org/elasticsearch/compute/lucene/LuceneMinOperatorTestCase.java

@@ -116,7 +116,15 @@ public abstract class LuceneMinOperatorTestCase extends AnyOperatorTestCase {
         } else {
             query = SortedNumericDocValuesField.newSlowRangeQuery(FIELD_NAME, Long.MIN_VALUE, Long.MAX_VALUE);
         }
-        return new LuceneMinFactory(List.of(ctx), c -> query, dataPartitioning, between(1, 8), FIELD_NAME, getNumberType(), limit);
+        return new LuceneMinFactory(
+            List.of(ctx),
+            c -> List.of(new LuceneSliceQueue.QueryAndTags(query, List.of())),
+            dataPartitioning,
+            between(1, 8),
+            FIELD_NAME,
+            getNumberType(),
+            limit
+        );
     }
 
     public void testSimple() {

+ 1 - 1
x-pack/plugin/esql/compute/src/test/java/org/elasticsearch/compute/lucene/LuceneQueryEvaluatorTests.java

@@ -273,7 +273,7 @@ public abstract class LuceneQueryEvaluatorTests<T extends Vector, U extends Vect
         final ShardContext searchContext = new LuceneSourceOperatorTests.MockShardContext(reader, 0);
         return new LuceneSourceOperator.Factory(
             List.of(searchContext),
-            ctx -> query,
+            ctx -> List.of(new LuceneSliceQueue.QueryAndTags(query, List.of())),
             randomFrom(DataPartitioning.values()),
             randomIntBetween(1, 10),
             randomPageSize(),

+ 147 - 18
x-pack/plugin/esql/compute/src/test/java/org/elasticsearch/compute/lucene/LuceneSourceOperatorTests.java

@@ -7,19 +7,21 @@
 
 package org.elasticsearch.compute.lucene;
 
+import com.carrotsearch.randomizedtesting.annotations.ParametersFactory;
+
 import org.apache.lucene.document.SortedNumericDocValuesField;
 import org.apache.lucene.index.IndexReader;
 import org.apache.lucene.index.IndexableField;
 import org.apache.lucene.index.NoMergePolicy;
 import org.apache.lucene.search.IndexSearcher;
 import org.apache.lucene.search.MatchAllDocsQuery;
-import org.apache.lucene.search.Query;
 import org.apache.lucene.store.Directory;
 import org.apache.lucene.tests.index.RandomIndexWriter;
 import org.elasticsearch.common.breaker.CircuitBreakingException;
 import org.elasticsearch.compute.data.DocBlock;
 import org.elasticsearch.compute.data.DoubleBlock;
 import org.elasticsearch.compute.data.ElementType;
+import org.elasticsearch.compute.data.IntBlock;
 import org.elasticsearch.compute.data.LongBlock;
 import org.elasticsearch.compute.data.Page;
 import org.elasticsearch.compute.operator.Driver;
@@ -63,9 +65,117 @@ import static org.hamcrest.Matchers.matchesRegex;
 
 public class LuceneSourceOperatorTests extends AnyOperatorTestCase {
     private static final MappedFieldType S_FIELD = new NumberFieldMapper.NumberFieldType("s", NumberFieldMapper.NumberType.LONG);
+
+    @ParametersFactory(argumentFormatting = "%s %s")
+    public static Iterable<Object[]> parameters() {
+        List<Object[]> parameters = new ArrayList<>();
+        for (TestCase c : TestCase.values()) {
+            for (boolean scoring : new boolean[] { false, true }) {
+                parameters.add(new Object[] { c, scoring });
+            }
+        }
+        return parameters;
+    }
+
+    public enum TestCase {
+        MATCH_ALL {
+            @Override
+            List<LuceneSliceQueue.QueryAndTags> queryAndExtra() {
+                return List.of(new LuceneSliceQueue.QueryAndTags(new MatchAllDocsQuery(), List.of()));
+            }
+
+            @Override
+            void checkPages(int numDocs, int limit, int maxPageSize, List<Page> results) {
+                int maxPages = Math.min(numDocs, limit);
+                int minPages = (int) Math.ceil((double) maxPages / maxPageSize);
+                assertThat(results, hasSize(both(greaterThanOrEqualTo(minPages)).and(lessThanOrEqualTo(maxPages))));
+            }
+
+            @Override
+            int numResults(int numDocs) {
+                return numDocs;
+            }
+        },
+        MATCH_0_AND_1 {
+            @Override
+            List<LuceneSliceQueue.QueryAndTags> queryAndExtra() {
+                return List.of(
+                    new LuceneSliceQueue.QueryAndTags(SortedNumericDocValuesField.newSlowExactQuery("s", 0), List.of(123)),
+                    new LuceneSliceQueue.QueryAndTags(SortedNumericDocValuesField.newSlowExactQuery("s", 1), List.of(456))
+                );
+            }
+
+            @Override
+            void checkPages(int numDocs, int limit, int maxPageSize, List<Page> results) {
+                assertThat(results, hasSize(Math.min(numDocs, 2)));
+                if (results.isEmpty() == false) {
+                    Page page = results.get(0);
+                    IntBlock extra = page.getBlock(page.getBlockCount() - 2);
+                    assertThat(extra.asVector().isConstant(), equalTo(true));
+                    assertThat(extra.getInt(0), equalTo(123));
+                    if (results.size() > 1) {
+                        page = results.get(1);
+                        extra = page.getBlock(page.getBlockCount() - 2);
+                        assertThat(extra.asVector().isConstant(), equalTo(true));
+                        assertThat(extra.getInt(0), equalTo(456));
+                    }
+                }
+            }
+
+            @Override
+            int numResults(int numDocs) {
+                return Math.min(numDocs, 2);
+            }
+        },
+        LTE_100_GT_100 {
+            @Override
+            List<LuceneSliceQueue.QueryAndTags> queryAndExtra() {
+                return List.of(
+                    new LuceneSliceQueue.QueryAndTags(SortedNumericDocValuesField.newSlowRangeQuery("s", 0, 100), List.of(123)),
+                    new LuceneSliceQueue.QueryAndTags(SortedNumericDocValuesField.newSlowRangeQuery("s", 101, Long.MAX_VALUE), List.of(456))
+                );
+            }
+
+            @Override
+            void checkPages(int numDocs, int limit, int maxPageSize, List<Page> results) {
+                MATCH_ALL.checkPages(numDocs, limit, maxPageSize, results);
+                for (Page page : results) {
+                    IntBlock extra = page.getBlock(page.getBlockCount() - 2);
+                    LongBlock data = page.getBlock(page.getBlockCount() - 1);
+                    for (int p = 0; p < page.getPositionCount(); p++) {
+                        assertThat(extra.getInt(p), equalTo(data.getLong(p) <= 100 ? 123 : 456));
+                    }
+                }
+            }
+
+            @Override
+            int numResults(int numDocs) {
+                return numDocs;
+            }
+        };
+
+        abstract List<LuceneSliceQueue.QueryAndTags> queryAndExtra();
+
+        abstract void checkPages(int numDocs, int limit, int maxPageSize, List<Page> results);
+
+        abstract int numResults(int numDocs);
+    }
+
+    private final TestCase testCase;
+    /**
+     * Do we enable scoring? We don't check the score in this test, but
+     * it's nice to make sure everything else works with scoring enabled.
+     */
+    private final boolean scoring;
+
     private Directory directory = newDirectory();
     private IndexReader reader;
 
+    public LuceneSourceOperatorTests(TestCase testCase, boolean scoring) {
+        this.testCase = testCase;
+        this.scoring = scoring;
+    }
+
     @After
     public void closeIndex() throws IOException {
         IOUtils.close(reader, directory);
@@ -94,12 +204,19 @@ public class LuceneSourceOperatorTests extends AnyOperatorTestCase {
                 }
             }
             reader = writer.getReader();
+
+            IndexSearcher searcher = new IndexSearcher(reader);
+            int count = 0;
+            for (LuceneSliceQueue.QueryAndTags q : testCase.queryAndExtra()) {
+                count += searcher.count(q.query());
+            }
+            assertThat(count, equalTo(testCase.numResults(numDocs)));
         } catch (IOException e) {
             throw new RuntimeException(e);
         }
 
         ShardContext ctx = new MockShardContext(reader, 0);
-        Function<ShardContext, Query> queryFunction = c -> new MatchAllDocsQuery();
+        Function<ShardContext, List<LuceneSliceQueue.QueryAndTags>> queryFunction = c -> testCase.queryAndExtra();
         int maxPageSize = between(10, Math.max(10, numDocs));
         int taskConcurrency = randomIntBetween(1, 4);
         return new LuceneSourceOperator.Factory(
@@ -152,15 +269,17 @@ public class LuceneSourceOperatorTests extends AnyOperatorTestCase {
     }
 
     public void testEarlyTermination() {
-        int size = between(1_000, 20_000);
-        int limit = between(0, Integer.MAX_VALUE);
-        LuceneSourceOperator.Factory factory = simple(randomFrom(DataPartitioning.values()), size, limit, scoring);
+        int numDocs = between(1_000, 20_000);
+        int limit = between(0, numDocs * 2);
+        LuceneSourceOperator.Factory factory = simple(randomFrom(DataPartitioning.values()), numDocs, limit, scoring);
         int taskConcurrency = factory.taskConcurrency();
         final AtomicInteger receivedRows = new AtomicInteger();
+        List<SourceOperator> sources = new ArrayList<>();
         List<Driver> drivers = new ArrayList<>();
         for (int i = 0; i < taskConcurrency; i++) {
             DriverContext driverContext = driverContext();
             SourceOperator sourceOperator = factory.get(driverContext);
+            sources.add(sourceOperator);
             SinkOperator sinkOperator = new PageConsumerOperator(p -> {
                 receivedRows.addAndGet(p.getPositionCount());
                 p.releaseBlocks();
@@ -183,7 +302,17 @@ public class LuceneSourceOperatorTests extends AnyOperatorTestCase {
             drivers.add(driver);
         }
         OperatorTestCase.runDriver(drivers);
-        assertThat(receivedRows.get(), equalTo(Math.min(limit, size)));
+        for (SourceOperator source : sources) {
+            logger.info("source status {}", source.status());
+        }
+        logger.info(
+            "{} received={} limit={} numResults={}",
+            factory.dataPartitioning,
+            receivedRows.get(),
+            limit,
+            testCase.numResults(numDocs)
+        );
+        assertThat(receivedRows.get(), equalTo(Math.min(limit, testCase.numResults(numDocs))));
     }
 
     public void testEmpty() {
@@ -228,8 +357,8 @@ public class LuceneSourceOperatorTests extends AnyOperatorTestCase {
         }
     }
 
-    private void testSimple(DriverContext ctx, DataPartitioning partitioning, int size, int limit) {
-        LuceneSourceOperator.Factory factory = simple(partitioning, size, limit, scoring);
+    private void testSimple(DriverContext ctx, DataPartitioning partitioning, int numDocs, int limit) {
+        LuceneSourceOperator.Factory factory = simple(partitioning, numDocs, limit, scoring);
         Operator.OperatorFactory readS = ValuesSourceReaderOperatorTests.factory(reader, S_FIELD, ElementType.LONG);
 
         List<Page> results = new ArrayList<>();
@@ -246,26 +375,26 @@ public class LuceneSourceOperatorTests extends AnyOperatorTestCase {
         for (Page page : results) {
             LongBlock sBlock = page.getBlock(initialBlockIndex(page));
             for (int p = 0; p < page.getPositionCount(); p++) {
-                assertThat(sBlock.getLong(sBlock.getFirstValueIndex(p)), both(greaterThanOrEqualTo(0L)).and(lessThan((long) size)));
+                assertThat(sBlock.getLong(sBlock.getFirstValueIndex(p)), both(greaterThanOrEqualTo(0L)).and(lessThan((long) numDocs)));
             }
         }
-        int maxPages = Math.min(size, limit);
-        int minPages = (int) Math.ceil(maxPages / factory.maxPageSize());
-        assertThat(results, hasSize(both(greaterThanOrEqualTo(minPages)).and(lessThanOrEqualTo(maxPages))));
-    }
 
-    // Scores are not interesting to this test, but enabled conditionally and effectively ignored just for coverage.
-    private final boolean scoring = randomBoolean();
+        testCase.checkPages(numDocs, limit, factory.maxPageSize(), results);
+        int count = results.stream().mapToInt(Page::getPositionCount).sum();
+        logger.info("{} received={} limit={} numResults={}", factory.dataPartitioning, count, limit, testCase.numResults(numDocs));
+        assertThat(count, equalTo(Math.min(limit, testCase.numResults(numDocs))));
+    }
 
     // Returns the initial block index, ignoring the score block if scoring is enabled
     private int initialBlockIndex(Page page) {
         assert page.getBlock(0) instanceof DocBlock : "expected doc block at index 0";
+        int offset = 1;
         if (scoring) {
             assert page.getBlock(1) instanceof DoubleBlock : "expected double block at index 1";
-            return 2;
-        } else {
-            return 1;
+            offset++;
         }
+        offset += testCase.queryAndExtra().get(0).tags().size();
+        return offset;
     }
 
     /**

+ 3 - 2
x-pack/plugin/esql/compute/src/test/java/org/elasticsearch/compute/lucene/LuceneTopNSourceOperatorScoringTests.java

@@ -12,7 +12,6 @@ import org.apache.lucene.index.IndexReader;
 import org.apache.lucene.index.IndexableField;
 import org.apache.lucene.index.NoMergePolicy;
 import org.apache.lucene.search.MatchAllDocsQuery;
-import org.apache.lucene.search.Query;
 import org.apache.lucene.search.Sort;
 import org.apache.lucene.search.SortField;
 import org.apache.lucene.search.SortedNumericSelector;
@@ -91,7 +90,9 @@ public class LuceneTopNSourceOperatorScoringTests extends LuceneTopNSourceOperat
                 return Optional.of(new SortAndFormats(new Sort(field), new DocValueFormat[] { null }));
             }
         };
-        Function<ShardContext, Query> queryFunction = c -> new MatchAllDocsQuery();
+        Function<ShardContext, List<LuceneSliceQueue.QueryAndTags>> queryFunction = c -> List.of(
+            new LuceneSliceQueue.QueryAndTags(new MatchAllDocsQuery(), List.of())
+        );
         int taskConcurrency = 0;
         int maxPageSize = between(10, Math.max(10, size));
         List<SortBuilder<?>> sorts = List.of(new FieldSortBuilder("s"));

+ 3 - 2
x-pack/plugin/esql/compute/src/test/java/org/elasticsearch/compute/lucene/LuceneTopNSourceOperatorTests.java

@@ -12,7 +12,6 @@ import org.apache.lucene.index.IndexReader;
 import org.apache.lucene.index.IndexableField;
 import org.apache.lucene.index.NoMergePolicy;
 import org.apache.lucene.search.MatchAllDocsQuery;
-import org.apache.lucene.search.Query;
 import org.apache.lucene.search.Sort;
 import org.apache.lucene.search.SortField;
 import org.apache.lucene.search.SortedNumericSelector;
@@ -96,7 +95,9 @@ public class LuceneTopNSourceOperatorTests extends AnyOperatorTestCase {
                 return Optional.of(new SortAndFormats(new Sort(field), new DocValueFormat[] { null }));
             }
         };
-        Function<ShardContext, Query> queryFunction = c -> new MatchAllDocsQuery();
+        Function<ShardContext, List<LuceneSliceQueue.QueryAndTags>> queryFunction = c -> List.of(
+            new LuceneSliceQueue.QueryAndTags(new MatchAllDocsQuery(), List.of())
+        );
         int taskConcurrency = 0;
         int maxPageSize = between(10, Math.max(10, size));
         List<SortBuilder<?>> sorts = List.of(new FieldSortBuilder("s"));

+ 4 - 2
x-pack/plugin/esql/compute/src/test/java/org/elasticsearch/compute/lucene/TimeSeriesSourceOperatorTests.java

@@ -310,7 +310,7 @@ public class TimeSeriesSourceOperatorTests extends AnyOperatorTestCase {
                     randomBoolean(),
                     List.of(ctx),
                     List.of(),
-                    unused -> query
+                    unused -> List.of(new LuceneSliceQueue.QueryAndTags(query, List.of()))
                 );
                 var driverContext = driverContext();
                 List<Page> results = new ArrayList<>();
@@ -449,7 +449,9 @@ public class TimeSeriesSourceOperatorTests extends AnyOperatorTestCase {
                 throw new IllegalArgumentException("Unknown field [" + name + "]");
             }
         };
-        Function<ShardContext, Query> queryFunction = c -> new MatchAllDocsQuery();
+        Function<ShardContext, List<LuceneSliceQueue.QueryAndTags>> queryFunction = c -> List.of(
+            new LuceneSliceQueue.QueryAndTags(new MatchAllDocsQuery(), List.of())
+        );
 
         var fieldInfos = extractFields.stream()
             .map(

+ 3 - 3
x-pack/plugin/esql/compute/src/test/java/org/elasticsearch/compute/lucene/ValueSourceReaderTypeConversionTests.java

@@ -262,7 +262,7 @@ public class ValueSourceReaderTypeConversionTests extends AnyOperatorTestCase {
         }
         var luceneFactory = new LuceneSourceOperator.Factory(
             shardContexts,
-            ctx -> new MatchAllDocsQuery(),
+            ctx -> List.of(new LuceneSliceQueue.QueryAndTags(new MatchAllDocsQuery(), List.of())),
             DataPartitioning.SHARD,
             1,// randomIntBetween(1, 10),
             pageSize,
@@ -1290,7 +1290,7 @@ public class ValueSourceReaderTypeConversionTests extends AnyOperatorTestCase {
         DriverContext driverContext = driverContext();
         var luceneFactory = new LuceneSourceOperator.Factory(
             List.of(shardContext),
-            ctx -> new MatchAllDocsQuery(),
+            ctx -> List.of(new LuceneSliceQueue.QueryAndTags(new MatchAllDocsQuery(), List.of())),
             randomFrom(DataPartitioning.values()),
             randomIntBetween(1, 10),
             randomPageSize(),
@@ -1449,7 +1449,7 @@ public class ValueSourceReaderTypeConversionTests extends AnyOperatorTestCase {
             }
             var luceneFactory = new LuceneSourceOperator.Factory(
                 contexts,
-                ctx -> new MatchAllDocsQuery(),
+                ctx -> List.of(new LuceneSliceQueue.QueryAndTags(new MatchAllDocsQuery(), List.of())),
                 DataPartitioning.SHARD,
                 randomIntBetween(1, 10),
                 1000,

+ 3 - 3
x-pack/plugin/esql/compute/src/test/java/org/elasticsearch/compute/lucene/ValuesSourceReaderOperatorTests.java

@@ -178,7 +178,7 @@ public class ValuesSourceReaderOperatorTests extends OperatorTestCase {
         }
         var luceneFactory = new LuceneSourceOperator.Factory(
             List.of(new LuceneSourceOperatorTests.MockShardContext(reader, 0)),
-            ctx -> new MatchAllDocsQuery(),
+            ctx -> List.of(new LuceneSliceQueue.QueryAndTags(new MatchAllDocsQuery(), List.of())),
             DataPartitioning.SHARD,
             randomIntBetween(1, 10),
             pageSize,
@@ -1334,7 +1334,7 @@ public class ValuesSourceReaderOperatorTests extends OperatorTestCase {
         DriverContext driverContext = driverContext();
         var luceneFactory = new LuceneSourceOperator.Factory(
             List.of(new LuceneSourceOperatorTests.MockShardContext(reader, 0)),
-            ctx -> new MatchAllDocsQuery(),
+            ctx -> List.of(new LuceneSliceQueue.QueryAndTags(new MatchAllDocsQuery(), List.of())),
             randomFrom(DataPartitioning.values()),
             randomIntBetween(1, 10),
             randomPageSize(),
@@ -1580,7 +1580,7 @@ public class ValuesSourceReaderOperatorTests extends OperatorTestCase {
             }
             var luceneFactory = new LuceneSourceOperator.Factory(
                 contexts,
-                ctx -> new MatchAllDocsQuery(),
+                ctx -> List.of(new LuceneSliceQueue.QueryAndTags(new MatchAllDocsQuery(), List.of())),
                 DataPartitioning.SHARD,
                 randomIntBetween(1, 10),
                 1000,

+ 2 - 1
x-pack/plugin/esql/src/internalClusterTest/java/org/elasticsearch/xpack/esql/action/LookupFromIndexIT.java

@@ -25,6 +25,7 @@ import org.elasticsearch.compute.data.BlockFactory;
 import org.elasticsearch.compute.data.LongBlock;
 import org.elasticsearch.compute.data.LongVector;
 import org.elasticsearch.compute.lucene.DataPartitioning;
+import org.elasticsearch.compute.lucene.LuceneSliceQueue;
 import org.elasticsearch.compute.lucene.LuceneSourceOperator;
 import org.elasticsearch.compute.lucene.ShardContext;
 import org.elasticsearch.compute.lucene.ValuesSourceReaderOperator;
@@ -187,7 +188,7 @@ public class LookupFromIndexIT extends AbstractEsqlIntegTestCase {
             );
             LuceneSourceOperator.Factory source = new LuceneSourceOperator.Factory(
                 List.of(esqlContext),
-                ctx -> new MatchAllDocsQuery(),
+                ctx -> List.of(new LuceneSliceQueue.QueryAndTags(new MatchAllDocsQuery(), List.of())),
                 DataPartitioning.SEGMENT,
                 1,
                 10000,

+ 5 - 2
x-pack/plugin/esql/src/main/java/org/elasticsearch/xpack/esql/planner/EsPhysicalOperationProviders.java

@@ -21,6 +21,7 @@ import org.elasticsearch.compute.data.ElementType;
 import org.elasticsearch.compute.lucene.DataPartitioning;
 import org.elasticsearch.compute.lucene.LuceneCountOperator;
 import org.elasticsearch.compute.lucene.LuceneOperator;
+import org.elasticsearch.compute.lucene.LuceneSliceQueue;
 import org.elasticsearch.compute.lucene.LuceneSourceOperator;
 import org.elasticsearch.compute.lucene.LuceneTopNSourceOperator;
 import org.elasticsearch.compute.lucene.TimeSeriesSourceOperatorFactory;
@@ -192,9 +193,11 @@ public class EsPhysicalOperationProviders extends AbstractPhysicalOperationProvi
         return null;
     }
 
-    public Function<org.elasticsearch.compute.lucene.ShardContext, Query> querySupplier(QueryBuilder builder) {
+    public Function<org.elasticsearch.compute.lucene.ShardContext, List<LuceneSliceQueue.QueryAndTags>> querySupplier(
+        QueryBuilder builder
+    ) {
         QueryBuilder qb = builder == null ? QueryBuilders.matchAllQuery().boost(0.0f) : builder;
-        return ctx -> shardContexts.get(ctx.index()).toQuery(qb);
+        return ctx -> List.of(new LuceneSliceQueue.QueryAndTags(shardContexts.get(ctx.index()).toQuery(qb), List.of()));
     }
 
     @Override