Browse Source

ESQL: Heuristics to pick efficient partitioning (#125739)

Adds heuristics to pick an efficient partitioning strategy based on the
index and rewritten query. This speeds up some queries by throwing more
cores at the problem:
```
FROM test | STATS SUM(b)

Before: took: 31 CPU: 222.3%
 After: took: 15 CPU: 806.9%
```

It also lowers the overhead of simpler queries by throwing less cores at
the problem when it won't really speed anything up:
```
FROM test

Before: took: 1 CPU: 48.5%
 After: took: 1 CPU: 70.4%
```

We have had a `pragma` to control our data partitioning for a long time,
this just looks at the query to pick a partitioning scheme. The
partitioning options:
* `shard`: use one core per shard
* `segment`: use one core per large segment
* `doc`: break each shard into as many segments as there are cores

`doc` is the fastest, but has a lot of overhead, especially for complex
Lucene queries. `segment` is fast, but doesn't make the most out of CPUs
when there are few segments. `shard` has the lowest overhead.

Previously we always used `segment` partitioning because it doesn't have
the terrible overhead but is fast. With this change we use `doc` when
the top level query matches all documents - those have very very low
overhead even in the `doc` partitioning. That's the twice as fast
example above.

This also uses the `shard` partitioning for queries that don't have to
do much work like `FROM foo` or `FROM foo | LIMIT 1` or
`FROM foo | SORT a`. That's the lower CPU example above.

This forking choice is taken very late on the data node. So queries like
this:
```
FROM test | WHERE @timestamp > "2025-01-01T00:00:00Z" | STATS SUM(b)
```
can also use the `doc` partitioning when all documents are after the
timestamp and all documents have `b`.
Nik Everett 6 months ago
parent
commit
5689dfa9bb
25 changed files with 921 additions and 136 deletions
  1. 5 0
      docs/changelog/125739.yaml
  2. 1 0
      server/src/main/java/org/elasticsearch/TransportVersions.java
  3. 31 3
      x-pack/plugin/esql/compute/src/main/java/org/elasticsearch/compute/lucene/DataPartitioning.java
  4. 10 1
      x-pack/plugin/esql/compute/src/main/java/org/elasticsearch/compute/lucene/LuceneCountOperator.java
  5. 10 3
      x-pack/plugin/esql/compute/src/main/java/org/elasticsearch/compute/lucene/LuceneMaxFactory.java
  6. 10 3
      x-pack/plugin/esql/compute/src/main/java/org/elasticsearch/compute/lucene/LuceneMinFactory.java
  7. 38 20
      x-pack/plugin/esql/compute/src/main/java/org/elasticsearch/compute/lucene/LuceneOperator.java
  8. 179 55
      x-pack/plugin/esql/compute/src/main/java/org/elasticsearch/compute/lucene/LuceneSliceQueue.java
  9. 122 2
      x-pack/plugin/esql/compute/src/main/java/org/elasticsearch/compute/lucene/LuceneSourceOperator.java
  10. 13 1
      x-pack/plugin/esql/compute/src/main/java/org/elasticsearch/compute/lucene/LuceneTopNSourceOperator.java
  11. 10 3
      x-pack/plugin/esql/compute/src/main/java/org/elasticsearch/compute/lucene/TimeSeriesSortedSourceOperatorFactory.java
  12. 1 1
      x-pack/plugin/esql/compute/src/test/java/org/elasticsearch/compute/lucene/LuceneCountOperatorTests.java
  13. 1 1
      x-pack/plugin/esql/compute/src/test/java/org/elasticsearch/compute/lucene/LuceneMaxOperatorTestCase.java
  14. 1 1
      x-pack/plugin/esql/compute/src/test/java/org/elasticsearch/compute/lucene/LuceneMinOperatorTestCase.java
  15. 127 0
      x-pack/plugin/esql/compute/src/test/java/org/elasticsearch/compute/lucene/LuceneSourceOperatorAutoStrategyTests.java
  16. 31 5
      x-pack/plugin/esql/compute/src/test/java/org/elasticsearch/compute/lucene/LuceneSourceOperatorStatusTests.java
  17. 43 20
      x-pack/plugin/esql/compute/src/test/java/org/elasticsearch/compute/lucene/LuceneSourceOperatorTests.java
  18. 229 0
      x-pack/plugin/esql/qa/server/single-node/src/javaRestTest/java/org/elasticsearch/xpack/esql/qa/single_node/EsqlPartitioningIT.java
  19. 2 1
      x-pack/plugin/esql/qa/server/single-node/src/javaRestTest/java/org/elasticsearch/xpack/esql/qa/single_node/RestEsqlIT.java
  20. 16 5
      x-pack/plugin/esql/src/main/java/org/elasticsearch/xpack/esql/planner/EsPhysicalOperationProviders.java
  21. 10 1
      x-pack/plugin/esql/src/main/java/org/elasticsearch/xpack/esql/plugin/ComputeService.java
  22. 11 1
      x-pack/plugin/esql/src/main/java/org/elasticsearch/xpack/esql/plugin/EsqlPlugin.java
  23. 16 7
      x-pack/plugin/esql/src/main/java/org/elasticsearch/xpack/esql/plugin/QueryPragmas.java
  24. 2 1
      x-pack/plugin/esql/src/test/java/org/elasticsearch/xpack/esql/optimizer/PhysicalPlanOptimizerTests.java
  25. 2 1
      x-pack/plugin/esql/src/test/java/org/elasticsearch/xpack/esql/planner/LocalExecutionPlannerTests.java

+ 5 - 0
docs/changelog/125739.yaml

@@ -0,0 +1,5 @@
+pr: 125739
+summary: Heuristics to pick efficient partitioning
+area: ES|QL
+type: enhancement
+issues: []

+ 1 - 0
server/src/main/java/org/elasticsearch/TransportVersions.java

@@ -218,6 +218,7 @@ public class TransportVersions {
     public static final TransportVersion SEMANTIC_TEXT_CHUNKING_CONFIG = def(9_047_00_0);
     public static final TransportVersion REPO_ANALYSIS_COPY_BLOB = def(9_048_00_0);
     public static final TransportVersion AMAZON_BEDROCK_TASK_SETTINGS = def(9_049_00_0);
+    public static final TransportVersion ESQL_REPORT_SHARD_PARTITIONING = def(9_050_00_0);
 
     /*
      * STOP! READ THIS FIRST! No, really,

+ 31 - 3
x-pack/plugin/esql/compute/src/main/java/org/elasticsearch/compute/lucene/DataPartitioning.java

@@ -7,11 +7,39 @@
 
 package org.elasticsearch.compute.lucene;
 
-public enum DataPartitioning {
+import org.elasticsearch.compute.operator.Driver;
 
+/**
+ * How we partition the data across {@link Driver}s. Each request forks into
+ * {@code min(1.5 * cpus, partition_count)} threads on the data node. More partitions
+ * allow us to bring more threads to bear on CPU intensive data node side tasks.
+ */
+public enum DataPartitioning {
+    /**
+     * Automatically select the data partitioning based on the query and index.
+     * Usually that's {@link #SEGMENT}, but for small indices it's {@link #SHARD}.
+     * When the additional overhead from {@link #DOC} is fairly low then it'll
+     * pick {@link #DOC}.
+     */
+    AUTO,
+    /**
+     * Make one partition per shard. This is generally the slowest option, but it
+     * has the lowest CPU overhead.
+     */
     SHARD,
-
+    /**
+     * Partition on segment boundaries, this doesn't allow forking to as many CPUs
+     * as {@link #DOC} but it has much lower overhead.
+     * <p>
+     * It packs segments smaller than {@link LuceneSliceQueue#MAX_DOCS_PER_SLICE}
+     * docs together into a partition. Larger segments get their own partition.
+     * Each slice contains no more than {@link LuceneSliceQueue#MAX_SEGMENTS_PER_SLICE}.
+     */
     SEGMENT,
-
+    /**
+     * Partition each shard into {@code task_concurrency} partitions, splitting
+     * larger segments into slices. This allows bringing the most CPUs to bear on
+     * the problem but adds extra overhead, especially in query preparation.
+     */
     DOC,
 }

+ 10 - 1
x-pack/plugin/esql/compute/src/main/java/org/elasticsearch/compute/lucene/LuceneCountOperator.java

@@ -49,7 +49,16 @@ public class LuceneCountOperator extends LuceneOperator {
             int taskConcurrency,
             int limit
         ) {
-            super(contexts, weightFunction(queryFunction, ScoreMode.COMPLETE_NO_SCORES), dataPartitioning, taskConcurrency, limit, false);
+            super(
+                contexts,
+                queryFunction,
+                dataPartitioning,
+                query -> LuceneSliceQueue.PartitioningStrategy.SHARD,
+                taskConcurrency,
+                limit,
+                false,
+                ScoreMode.COMPLETE_NO_SCORES
+            );
         }
 
         @Override

+ 10 - 3
x-pack/plugin/esql/compute/src/main/java/org/elasticsearch/compute/lucene/LuceneMaxFactory.java

@@ -23,8 +23,6 @@ import java.io.IOException;
 import java.util.List;
 import java.util.function.Function;
 
-import static org.elasticsearch.compute.lucene.LuceneOperator.weightFunction;
-
 /**
  * Factory that generates an operator that finds the max value of a field using the {@link LuceneMinMaxOperator}.
  */
@@ -123,7 +121,16 @@ public final class LuceneMaxFactory extends LuceneOperator.Factory {
         NumberType numberType,
         int limit
     ) {
-        super(contexts, weightFunction(queryFunction, ScoreMode.COMPLETE_NO_SCORES), dataPartitioning, taskConcurrency, limit, false);
+        super(
+            contexts,
+            queryFunction,
+            dataPartitioning,
+            query -> LuceneSliceQueue.PartitioningStrategy.SHARD,
+            taskConcurrency,
+            limit,
+            false,
+            ScoreMode.COMPLETE_NO_SCORES
+        );
         this.fieldName = fieldName;
         this.numberType = numberType;
     }

+ 10 - 3
x-pack/plugin/esql/compute/src/main/java/org/elasticsearch/compute/lucene/LuceneMinFactory.java

@@ -23,8 +23,6 @@ import java.io.IOException;
 import java.util.List;
 import java.util.function.Function;
 
-import static org.elasticsearch.compute.lucene.LuceneOperator.weightFunction;
-
 /**
  * Factory that generates an operator that finds the min value of a field using the {@link LuceneMinMaxOperator}.
  */
@@ -123,7 +121,16 @@ public final class LuceneMinFactory extends LuceneOperator.Factory {
         NumberType numberType,
         int limit
     ) {
-        super(contexts, weightFunction(queryFunction, ScoreMode.COMPLETE_NO_SCORES), dataPartitioning, taskConcurrency, limit, false);
+        super(
+            contexts,
+            queryFunction,
+            dataPartitioning,
+            query -> LuceneSliceQueue.PartitioningStrategy.SHARD,
+            taskConcurrency,
+            limit,
+            false,
+            ScoreMode.COMPLETE_NO_SCORES
+        );
         this.fieldName = fieldName;
         this.numberType = numberType;
     }

+ 38 - 20
x-pack/plugin/esql/compute/src/main/java/org/elasticsearch/compute/lucene/LuceneOperator.java

@@ -9,7 +9,6 @@ package org.elasticsearch.compute.lucene;
 
 import org.apache.lucene.index.LeafReaderContext;
 import org.apache.lucene.search.BulkScorer;
-import org.apache.lucene.search.ConstantScoreQuery;
 import org.apache.lucene.search.DocIdSetIterator;
 import org.apache.lucene.search.LeafCollector;
 import org.apache.lucene.search.Query;
@@ -37,12 +36,16 @@ import java.util.Collection;
 import java.util.Collections;
 import java.util.HashSet;
 import java.util.List;
+import java.util.Map;
 import java.util.Objects;
 import java.util.Set;
+import java.util.TreeMap;
 import java.util.TreeSet;
 import java.util.function.Function;
 import java.util.stream.Collectors;
 
+import static org.elasticsearch.TransportVersions.ESQL_REPORT_SHARD_PARTITIONING;
+
 public abstract class LuceneOperator extends SourceOperator {
     private static final Logger logger = LogManager.getLogger(LuceneOperator.class);
 
@@ -93,15 +96,17 @@ public abstract class LuceneOperator extends SourceOperator {
          */
         protected Factory(
             List<? extends ShardContext> contexts,
-            Function<ShardContext, Weight> weightFunction,
+            Function<ShardContext, Query> queryFunction,
             DataPartitioning dataPartitioning,
+            Function<Query, LuceneSliceQueue.PartitioningStrategy> autoStrategy,
             int taskConcurrency,
             int limit,
-            boolean needsScore
+            boolean needsScore,
+            ScoreMode scoreMode
         ) {
             this.limit = limit;
             this.dataPartitioning = dataPartitioning;
-            this.sliceQueue = LuceneSliceQueue.create(contexts, weightFunction, dataPartitioning, taskConcurrency);
+            this.sliceQueue = LuceneSliceQueue.create(contexts, queryFunction, dataPartitioning, autoStrategy, taskConcurrency, scoreMode);
             this.taskConcurrency = Math.min(sliceQueue.totalSlices(), taskConcurrency);
             this.needsScore = needsScore;
         }
@@ -269,6 +274,7 @@ public abstract class LuceneOperator extends SourceOperator {
         private final int sliceMax;
         private final int current;
         private final long rowsEmitted;
+        private final Map<String, LuceneSliceQueue.PartitioningStrategy> partitioningStrategies;
 
         private Status(LuceneOperator operator) {
             processedSlices = operator.processedSlices;
@@ -294,6 +300,7 @@ public abstract class LuceneOperator extends SourceOperator {
             }
             pagesEmitted = operator.pagesEmitted;
             rowsEmitted = operator.rowsEmitted;
+            partitioningStrategies = operator.sliceQueue.partitioningStrategies();
         }
 
         Status(
@@ -307,7 +314,8 @@ public abstract class LuceneOperator extends SourceOperator {
             int sliceMin,
             int sliceMax,
             int current,
-            long rowsEmitted
+            long rowsEmitted,
+            Map<String, LuceneSliceQueue.PartitioningStrategy> partitioningStrategies
         ) {
             this.processedSlices = processedSlices;
             this.processedQueries = processedQueries;
@@ -320,6 +328,7 @@ public abstract class LuceneOperator extends SourceOperator {
             this.sliceMax = sliceMax;
             this.current = current;
             this.rowsEmitted = rowsEmitted;
+            this.partitioningStrategies = partitioningStrategies;
         }
 
         Status(StreamInput in) throws IOException {
@@ -343,6 +352,9 @@ public abstract class LuceneOperator extends SourceOperator {
             } else {
                 rowsEmitted = 0;
             }
+            partitioningStrategies = in.getTransportVersion().onOrAfter(ESQL_REPORT_SHARD_PARTITIONING)
+                ? in.readMap(LuceneSliceQueue.PartitioningStrategy::readFrom)
+                : Map.of();
         }
 
         @Override
@@ -364,6 +376,9 @@ public abstract class LuceneOperator extends SourceOperator {
             if (out.getTransportVersion().onOrAfter(TransportVersions.ESQL_PROFILE_ROWS_PROCESSED)) {
                 out.writeVLong(rowsEmitted);
             }
+            if (out.getTransportVersion().onOrAfter(ESQL_REPORT_SHARD_PARTITIONING)) {
+                out.writeMap(partitioningStrategies, StreamOutput::writeString, StreamOutput::writeWriteable);
+            }
         }
 
         @Override
@@ -415,6 +430,10 @@ public abstract class LuceneOperator extends SourceOperator {
             return rowsEmitted;
         }
 
+        public Map<String, LuceneSliceQueue.PartitioningStrategy> partitioningStrategies() {
+            return partitioningStrategies;
+        }
+
         @Override
         public XContentBuilder toXContent(XContentBuilder builder, Params params) throws IOException {
             builder.startObject();
@@ -432,6 +451,7 @@ public abstract class LuceneOperator extends SourceOperator {
             builder.field("slice_max", sliceMax);
             builder.field("current", current);
             builder.field("rows_emitted", rowsEmitted);
+            builder.field("partitioning_strategies", new TreeMap<>(this.partitioningStrategies));
             return builder.endObject();
         }
 
@@ -450,12 +470,23 @@ public abstract class LuceneOperator extends SourceOperator {
                 && sliceMin == status.sliceMin
                 && sliceMax == status.sliceMax
                 && current == status.current
-                && rowsEmitted == status.rowsEmitted;
+                && rowsEmitted == status.rowsEmitted
+                && partitioningStrategies.equals(status.partitioningStrategies);
         }
 
         @Override
         public int hashCode() {
-            return Objects.hash(processedSlices, sliceIndex, totalSlices, pagesEmitted, sliceMin, sliceMax, current, rowsEmitted);
+            return Objects.hash(
+                processedSlices,
+                sliceIndex,
+                totalSlices,
+                pagesEmitted,
+                sliceMin,
+                sliceMax,
+                current,
+                rowsEmitted,
+                partitioningStrategies
+            );
         }
 
         @Override
@@ -468,17 +499,4 @@ public abstract class LuceneOperator extends SourceOperator {
             return TransportVersions.V_8_11_X;
         }
     }
-
-    static Function<ShardContext, Weight> weightFunction(Function<ShardContext, Query> queryFunction, ScoreMode scoreMode) {
-        return ctx -> {
-            final var query = queryFunction.apply(ctx);
-            final var searcher = ctx.searcher();
-            try {
-                Query actualQuery = scoreMode.needsScores() ? query : new ConstantScoreQuery(query);
-                return searcher.createWeight(searcher.rewrite(actualQuery), scoreMode, 1);
-            } catch (IOException e) {
-                throw new UncheckedIOException(e);
-            }
-        };
-    }
 }

+ 179 - 55
x-pack/plugin/esql/compute/src/main/java/org/elasticsearch/compute/lucene/LuceneSliceQueue.java

@@ -7,17 +7,25 @@
 
 package org.elasticsearch.compute.lucene;
 
-import org.apache.lucene.index.IndexReader;
 import org.apache.lucene.index.LeafReaderContext;
+import org.apache.lucene.search.ConstantScoreQuery;
 import org.apache.lucene.search.IndexSearcher;
+import org.apache.lucene.search.Query;
+import org.apache.lucene.search.ScoreMode;
 import org.apache.lucene.search.Weight;
+import org.elasticsearch.common.io.stream.StreamInput;
+import org.elasticsearch.common.io.stream.StreamOutput;
+import org.elasticsearch.common.io.stream.Writeable;
 import org.elasticsearch.core.Nullable;
 
+import java.io.IOException;
+import java.io.UncheckedIOException;
 import java.util.ArrayList;
 import java.util.Arrays;
 import java.util.Collection;
-import java.util.Collections;
+import java.util.HashMap;
 import java.util.List;
+import java.util.Map;
 import java.util.Queue;
 import java.util.concurrent.ConcurrentLinkedQueue;
 import java.util.function.Function;
@@ -26,15 +34,17 @@ import java.util.function.Function;
  * Shared Lucene slices between Lucene operators.
  */
 public final class LuceneSliceQueue {
-    private static final int MAX_DOCS_PER_SLICE = 250_000; // copied from IndexSearcher
-    private static final int MAX_SEGMENTS_PER_SLICE = 5; // copied from IndexSearcher
+    public static final int MAX_DOCS_PER_SLICE = 250_000; // copied from IndexSearcher
+    public static final int MAX_SEGMENTS_PER_SLICE = 5; // copied from IndexSearcher
 
     private final int totalSlices;
     private final Queue<LuceneSlice> slices;
+    private final Map<String, PartitioningStrategy> partitioningStrategies;
 
-    private LuceneSliceQueue(List<LuceneSlice> slices) {
+    private LuceneSliceQueue(List<LuceneSlice> slices, Map<String, PartitioningStrategy> partitioningStrategies) {
         this.totalSlices = slices.size();
         this.slices = new ConcurrentLinkedQueue<>(slices);
+        this.partitioningStrategies = partitioningStrategies;
     }
 
     @Nullable
@@ -46,82 +56,196 @@ public final class LuceneSliceQueue {
         return totalSlices;
     }
 
+    /**
+     * Strategy used to partition each shard in this queue.
+     */
+    public Map<String, PartitioningStrategy> partitioningStrategies() {
+        return partitioningStrategies;
+    }
+
     public Collection<String> remainingShardsIdentifiers() {
         return slices.stream().map(slice -> slice.shardContext().shardIdentifier()).toList();
     }
 
     public static LuceneSliceQueue create(
         List<? extends ShardContext> contexts,
-        Function<ShardContext, Weight> weightFunction,
+        Function<ShardContext, Query> queryFunction,
         DataPartitioning dataPartitioning,
-        int taskConcurrency
+        Function<Query, PartitioningStrategy> autoStrategy,
+        int taskConcurrency,
+        ScoreMode scoreMode
     ) {
-        final List<LuceneSlice> slices = new ArrayList<>();
+        List<LuceneSlice> slices = new ArrayList<>();
+        Map<String, PartitioningStrategy> partitioningStrategies = new HashMap<>(contexts.size());
         for (ShardContext ctx : contexts) {
-            final List<LeafReaderContext> leafContexts = ctx.searcher().getLeafContexts();
-            List<List<PartialLeafReaderContext>> groups = switch (dataPartitioning) {
-                case SHARD -> Collections.singletonList(leafContexts.stream().map(PartialLeafReaderContext::new).toList());
-                case SEGMENT -> segmentSlices(leafContexts);
-                case DOC -> docSlices(ctx.searcher().getIndexReader(), taskConcurrency);
-            };
-            final Weight weight = weightFunction.apply(ctx);
+            Query query = queryFunction.apply(ctx);
+            query = scoreMode.needsScores() ? query : new ConstantScoreQuery(query);
+            /*
+             * Rewrite the query on the local index so things like fully
+             * overlapping range queries become match all. It's important
+             * to do this before picking the partitioning strategy so we
+             * can pick more aggressive strategies when the query rewrites
+             * into MatchAll.
+             */
+            try {
+                query = ctx.searcher().rewrite(query);
+            } catch (IOException e) {
+                throw new UncheckedIOException(e);
+            }
+            PartitioningStrategy partitioning = PartitioningStrategy.pick(dataPartitioning, autoStrategy, ctx, query);
+            partitioningStrategies.put(ctx.shardIdentifier(), partitioning);
+            List<List<PartialLeafReaderContext>> groups = partitioning.groups(ctx.searcher(), taskConcurrency);
+            Weight weight = weight(ctx, query, scoreMode);
             for (List<PartialLeafReaderContext> group : groups) {
                 if (group.isEmpty() == false) {
                     slices.add(new LuceneSlice(ctx, group, weight));
                 }
             }
         }
-        return new LuceneSliceQueue(slices);
+        return new LuceneSliceQueue(slices, partitioningStrategies);
     }
 
-    static List<List<PartialLeafReaderContext>> docSlices(IndexReader indexReader, int numSlices) {
-        final int totalDocCount = indexReader.maxDoc();
-        final int normalMaxDocsPerSlice = totalDocCount / numSlices;
-        final int extraDocsInFirstSlice = totalDocCount % numSlices;
-        final List<List<PartialLeafReaderContext>> slices = new ArrayList<>();
-        int docsAllocatedInCurrentSlice = 0;
-        List<PartialLeafReaderContext> currentSlice = null;
-        int maxDocsPerSlice = normalMaxDocsPerSlice + extraDocsInFirstSlice;
-        for (LeafReaderContext ctx : indexReader.leaves()) {
-            final int numDocsInLeaf = ctx.reader().maxDoc();
-            int minDoc = 0;
-            while (minDoc < numDocsInLeaf) {
-                int numDocsToUse = Math.min(maxDocsPerSlice - docsAllocatedInCurrentSlice, numDocsInLeaf - minDoc);
-                if (numDocsToUse <= 0) {
-                    break;
-                }
-                if (currentSlice == null) {
-                    currentSlice = new ArrayList<>();
+    /**
+     * Strategy used to partition each shard into slices. See {@link DataPartitioning}
+     * for descriptions on how each value works.
+     */
+    public enum PartitioningStrategy implements Writeable {
+        /**
+         * See {@link DataPartitioning#SHARD}.
+         */
+        SHARD(0) {
+            @Override
+            List<List<PartialLeafReaderContext>> groups(IndexSearcher searcher, int requestedNumSlices) {
+                return List.of(searcher.getLeafContexts().stream().map(PartialLeafReaderContext::new).toList());
+            }
+        },
+        /**
+         * See {@link DataPartitioning#SEGMENT}.
+         */
+        SEGMENT(1) {
+            @Override
+            List<List<PartialLeafReaderContext>> groups(IndexSearcher searcher, int requestedNumSlices) {
+                IndexSearcher.LeafSlice[] gs = IndexSearcher.slices(
+                    searcher.getLeafContexts(),
+                    MAX_DOCS_PER_SLICE,
+                    MAX_SEGMENTS_PER_SLICE,
+                    false
+                );
+                return Arrays.stream(gs).map(g -> Arrays.stream(g.partitions).map(PartialLeafReaderContext::new).toList()).toList();
+            }
+        },
+        /**
+         * See {@link DataPartitioning#DOC}.
+         */
+        DOC(2) {
+            @Override
+            List<List<PartialLeafReaderContext>> groups(IndexSearcher searcher, int requestedNumSlices) {
+                final int totalDocCount = searcher.getIndexReader().maxDoc();
+                final int normalMaxDocsPerSlice = totalDocCount / requestedNumSlices;
+                final int extraDocsInFirstSlice = totalDocCount % requestedNumSlices;
+                final List<List<PartialLeafReaderContext>> slices = new ArrayList<>();
+                int docsAllocatedInCurrentSlice = 0;
+                List<PartialLeafReaderContext> currentSlice = null;
+                int maxDocsPerSlice = normalMaxDocsPerSlice + extraDocsInFirstSlice;
+                for (LeafReaderContext ctx : searcher.getLeafContexts()) {
+                    final int numDocsInLeaf = ctx.reader().maxDoc();
+                    int minDoc = 0;
+                    while (minDoc < numDocsInLeaf) {
+                        int numDocsToUse = Math.min(maxDocsPerSlice - docsAllocatedInCurrentSlice, numDocsInLeaf - minDoc);
+                        if (numDocsToUse <= 0) {
+                            break;
+                        }
+                        if (currentSlice == null) {
+                            currentSlice = new ArrayList<>();
+                        }
+                        currentSlice.add(new PartialLeafReaderContext(ctx, minDoc, minDoc + numDocsToUse));
+                        minDoc += numDocsToUse;
+                        docsAllocatedInCurrentSlice += numDocsToUse;
+                        if (docsAllocatedInCurrentSlice == maxDocsPerSlice) {
+                            slices.add(currentSlice);
+                            // once the first slice with the extra docs is added, no need for extra docs
+                            maxDocsPerSlice = normalMaxDocsPerSlice;
+                            currentSlice = null;
+                            docsAllocatedInCurrentSlice = 0;
+                        }
+                    }
                 }
-                currentSlice.add(new PartialLeafReaderContext(ctx, minDoc, minDoc + numDocsToUse));
-                minDoc += numDocsToUse;
-                docsAllocatedInCurrentSlice += numDocsToUse;
-                if (docsAllocatedInCurrentSlice == maxDocsPerSlice) {
+                if (currentSlice != null) {
                     slices.add(currentSlice);
-                    maxDocsPerSlice = normalMaxDocsPerSlice; // once the first slice with the extra docs is added, no need for extra docs
-                    currentSlice = null;
-                    docsAllocatedInCurrentSlice = 0;
                 }
+                if (requestedNumSlices < totalDocCount && slices.size() != requestedNumSlices) {
+                    throw new IllegalStateException("wrong number of slices, expected " + requestedNumSlices + " but got " + slices.size());
+                }
+                if (slices.stream()
+                    .flatMapToInt(
+                        l -> l.stream()
+                            .mapToInt(partialLeafReaderContext -> partialLeafReaderContext.maxDoc() - partialLeafReaderContext.minDoc())
+                    )
+                    .sum() != totalDocCount) {
+                    throw new IllegalStateException("wrong doc count");
+                }
+                return slices;
             }
+        };
+
+        private final byte id;
+
+        PartitioningStrategy(int id) {
+            this.id = (byte) id;
         }
-        if (currentSlice != null) {
-            slices.add(currentSlice);
+
+        public static PartitioningStrategy readFrom(StreamInput in) throws IOException {
+            int id = in.readByte();
+            return switch (id) {
+                case 0 -> SHARD;
+                case 1 -> SEGMENT;
+                case 2 -> DOC;
+                default -> throw new IllegalArgumentException("invalid PartitioningStrategyId [" + id + "]");
+            };
         }
-        if (numSlices < totalDocCount && slices.size() != numSlices) {
-            throw new IllegalStateException("wrong number of slices, expected " + numSlices + " but got " + slices.size());
+
+        @Override
+        public void writeTo(StreamOutput out) throws IOException {
+            out.writeByte(id);
         }
-        if (slices.stream()
-            .flatMapToInt(
-                l -> l.stream().mapToInt(partialLeafReaderContext -> partialLeafReaderContext.maxDoc() - partialLeafReaderContext.minDoc())
-            )
-            .sum() != totalDocCount) {
-            throw new IllegalStateException("wrong doc count");
+
+        abstract List<List<PartialLeafReaderContext>> groups(IndexSearcher searcher, int requestedNumSlices);
+
+        private static PartitioningStrategy pick(
+            DataPartitioning dataPartitioning,
+            Function<Query, PartitioningStrategy> autoStrategy,
+            ShardContext ctx,
+            Query query
+        ) {
+            return switch (dataPartitioning) {
+                case SHARD -> PartitioningStrategy.SHARD;
+                case SEGMENT -> PartitioningStrategy.SEGMENT;
+                case DOC -> PartitioningStrategy.DOC;
+                case AUTO -> forAuto(autoStrategy, ctx, query);
+            };
+        }
+
+        /**
+         * {@link DataPartitioning#AUTO} resolves to {@link #SHARD} for indices
+         * with fewer than this many documents.
+         */
+        private static final int SMALL_INDEX_BOUNDARY = MAX_DOCS_PER_SLICE;
+
+        private static PartitioningStrategy forAuto(Function<Query, PartitioningStrategy> autoStrategy, ShardContext ctx, Query query) {
+            if (ctx.searcher().getIndexReader().maxDoc() < SMALL_INDEX_BOUNDARY) {
+                return PartitioningStrategy.SHARD;
+            }
+            return autoStrategy.apply(query);
         }
-        return slices;
     }
 
-    static List<List<PartialLeafReaderContext>> segmentSlices(List<LeafReaderContext> leafContexts) {
-        IndexSearcher.LeafSlice[] gs = IndexSearcher.slices(leafContexts, MAX_DOCS_PER_SLICE, MAX_SEGMENTS_PER_SLICE, false);
-        return Arrays.stream(gs).map(g -> Arrays.stream(g.partitions).map(PartialLeafReaderContext::new).toList()).toList();
+    static Weight weight(ShardContext ctx, Query query, ScoreMode scoreMode) {
+        var searcher = ctx.searcher();
+        try {
+            Query actualQuery = scoreMode.needsScores() ? query : new ConstantScoreQuery(query);
+            return searcher.createWeight(actualQuery, scoreMode, 1);
+        } catch (IOException e) {
+            throw new UncheckedIOException(e);
+        }
     }
 }

+ 122 - 2
x-pack/plugin/esql/compute/src/main/java/org/elasticsearch/compute/lucene/LuceneSourceOperator.java

@@ -7,8 +7,14 @@
 
 package org.elasticsearch.compute.lucene;
 
+import org.apache.lucene.search.BooleanClause;
+import org.apache.lucene.search.BooleanQuery;
+import org.apache.lucene.search.BoostQuery;
 import org.apache.lucene.search.CollectionTerminatedException;
+import org.apache.lucene.search.ConstantScoreQuery;
 import org.apache.lucene.search.LeafCollector;
+import org.apache.lucene.search.MatchAllDocsQuery;
+import org.apache.lucene.search.MatchNoDocsQuery;
 import org.apache.lucene.search.Query;
 import org.apache.lucene.search.Scorable;
 import org.elasticsearch.compute.data.BlockFactory;
@@ -17,22 +23,30 @@ import org.elasticsearch.compute.data.DocVector;
 import org.elasticsearch.compute.data.DoubleVector;
 import org.elasticsearch.compute.data.IntVector;
 import org.elasticsearch.compute.data.Page;
+import org.elasticsearch.compute.lucene.LuceneSliceQueue.PartitioningStrategy;
 import org.elasticsearch.compute.operator.DriverContext;
 import org.elasticsearch.compute.operator.Limiter;
 import org.elasticsearch.compute.operator.SourceOperator;
 import org.elasticsearch.core.Releasables;
+import org.elasticsearch.logging.LogManager;
+import org.elasticsearch.logging.Logger;
 
 import java.io.IOException;
+import java.util.ArrayList;
 import java.util.List;
 import java.util.function.Function;
 
 import static org.apache.lucene.search.ScoreMode.COMPLETE;
 import static org.apache.lucene.search.ScoreMode.COMPLETE_NO_SCORES;
+import static org.elasticsearch.compute.lucene.LuceneSliceQueue.PartitioningStrategy.DOC;
+import static org.elasticsearch.compute.lucene.LuceneSliceQueue.PartitioningStrategy.SEGMENT;
+import static org.elasticsearch.compute.lucene.LuceneSliceQueue.PartitioningStrategy.SHARD;
 
 /**
  * Source operator that incrementally runs Lucene searches
  */
 public class LuceneSourceOperator extends LuceneOperator {
+    private static final Logger log = LogManager.getLogger(LuceneSourceOperator.class);
 
     private int currentPagePos = 0;
     private int remainingDocs;
@@ -59,11 +73,13 @@ public class LuceneSourceOperator extends LuceneOperator {
         ) {
             super(
                 contexts,
-                weightFunction(queryFunction, needsScore ? COMPLETE : COMPLETE_NO_SCORES),
+                queryFunction,
                 dataPartitioning,
+                autoStrategy(limit),
                 taskConcurrency,
                 limit,
-                needsScore
+                needsScore,
+                needsScore ? COMPLETE : COMPLETE_NO_SCORES
             );
             this.maxPageSize = maxPageSize;
             // TODO: use a single limiter for multiple stage execution
@@ -91,6 +107,110 @@ public class LuceneSourceOperator extends LuceneOperator {
                 + needsScore
                 + "]";
         }
+
+        /**
+         * Pick a strategy for the {@link DataPartitioning#AUTO} partitioning.
+         */
+        public static Function<Query, PartitioningStrategy> autoStrategy(int limit) {
+            return limit == NO_LIMIT ? Factory::highSpeedAutoStrategy : Factory::lowOverheadAutoStrategy;
+        }
+
+        /**
+         * Use the {@link PartitioningStrategy#SHARD} strategy because
+         * it has the lowest overhead. Used when there is a {@code limit} on the operator
+         * because that's for cases like {@code FROM foo | LIMIT 10} or
+         * {@code FROM foo | WHERE a == 1 | LIMIT 10} when the {@code WHERE} can be pushed
+         * to Lucene. In those cases we're better off with the lowest overhead we can
+         * manage - and that's {@link PartitioningStrategy#SHARD}.
+         */
+        private static PartitioningStrategy lowOverheadAutoStrategy(Query query) {
+            return SHARD;
+        }
+
+        /**
+         * Select the {@link PartitioningStrategy} based on the {@link Query}.
+         * <ul>
+         *     <li>
+         *         If the {@linkplain Query} matches <strong>no</strong> documents then this will
+         *         use the {@link PartitioningStrategy#SHARD} strategy so we minimize the overhead
+         *         of finding nothing.
+         *     </li>
+         *     <li>
+         *         If the {@linkplain Query} matches <strong>all</strong> documents then this will
+         *         use the {@link PartitioningStrategy#DOC} strategy because the overhead of using
+         *         that strategy for {@link MatchAllDocsQuery} is very low, and we need as many CPUs
+         *         as we can get to process all the documents.
+         *     </li>
+         *     <li>
+         *         Otherwise use the {@link PartitioningStrategy#SEGMENT} strategy because it's
+         *         overhead is generally low.
+         *     </li>
+         * </ul>
+         */
+        private static PartitioningStrategy highSpeedAutoStrategy(Query query) {
+            Query unwrapped = unwrap(query);
+            log.trace("highSpeedAutoStrategy {} {}", query, unwrapped);
+            return switch (unwrapped) {
+                case BooleanQuery q -> highSpeedAutoStrategyForBoolean(q);
+                case MatchAllDocsQuery q -> DOC;
+                case MatchNoDocsQuery q -> SHARD;
+                default -> SEGMENT;
+            };
+        }
+
+        private static Query unwrap(Query query) {
+            while (true) {
+                switch (query) {
+                    case BoostQuery q: {
+                        query = q.getQuery();
+                        break;
+                    }
+                    case ConstantScoreQuery q: {
+                        query = q.getQuery();
+                        break;
+                    }
+                    default:
+                        return query;
+                }
+            }
+        }
+
+        /**
+         * Select the {@link PartitioningStrategy} for a {@link BooleanQuery}.
+         * <ul>
+         *     <li>
+         *         If the query can't match anything, returns {@link PartitioningStrategy#SEGMENT}.
+         *     </li>
+         *
+         * </ul>
+         */
+        private static PartitioningStrategy highSpeedAutoStrategyForBoolean(BooleanQuery query) {
+            List<PartitioningStrategy> clauses = new ArrayList<>(query.clauses().size());
+            boolean allRequired = true;
+            for (BooleanClause c : query) {
+                Query clauseQuery = unwrap(c.query());
+                log.trace("highSpeedAutoStrategyForBooleanClause {} {}", c.occur(), clauseQuery);
+                if ((c.isProhibited() && clauseQuery instanceof MatchAllDocsQuery)
+                    || (c.isRequired() && clauseQuery instanceof MatchNoDocsQuery)) {
+                    // Can't match anything
+                    return SHARD;
+                }
+                allRequired &= c.isRequired();
+                clauses.add(highSpeedAutoStrategy(clauseQuery));
+            }
+            log.trace("highSpeedAutoStrategyForBooleanClause {} {}", allRequired, clauses);
+            if (allRequired == false) {
+                return SEGMENT;
+            }
+            if (clauses.stream().anyMatch(s -> s == SHARD)) {
+                return SHARD;
+            }
+            if (clauses.stream().anyMatch(s -> s == SEGMENT)) {
+                return SEGMENT;
+            }
+            assert clauses.stream().allMatch(s -> s == DOC);
+            return DOC;
+        }
     }
 
     @SuppressWarnings("this-escape")

+ 13 - 1
x-pack/plugin/esql/compute/src/main/java/org/elasticsearch/compute/lucene/LuceneTopNSourceOperator.java

@@ -44,6 +44,9 @@ import java.util.Optional;
 import java.util.function.Function;
 import java.util.stream.Collectors;
 
+import static org.apache.lucene.search.ScoreMode.TOP_DOCS;
+import static org.apache.lucene.search.ScoreMode.TOP_DOCS_WITH_SCORES;
+
 /**
  * Source operator that builds Pages out of the output of a TopFieldCollector (aka TopN)
  */
@@ -63,7 +66,16 @@ public final class LuceneTopNSourceOperator extends LuceneOperator {
             List<SortBuilder<?>> sorts,
             boolean needsScore
         ) {
-            super(contexts, weightFunction(queryFunction, sorts, needsScore), dataPartitioning, taskConcurrency, limit, needsScore);
+            super(
+                contexts,
+                queryFunction,
+                dataPartitioning,
+                query -> LuceneSliceQueue.PartitioningStrategy.SHARD,
+                taskConcurrency,
+                limit,
+                needsScore,
+                needsScore ? TOP_DOCS_WITH_SCORES : TOP_DOCS
+            );
             this.maxPageSize = maxPageSize;
             this.sorts = sorts;
         }

+ 10 - 3
x-pack/plugin/esql/compute/src/main/java/org/elasticsearch/compute/lucene/TimeSeriesSortedSourceOperatorFactory.java

@@ -33,8 +33,6 @@ import java.io.UncheckedIOException;
 import java.util.List;
 import java.util.function.Function;
 
-import static org.elasticsearch.compute.lucene.LuceneOperator.weightFunction;
-
 /**
  * Creates a source operator that takes advantage of the natural sorting of segments in a tsdb index.
  * <p>
@@ -58,7 +56,16 @@ public class TimeSeriesSortedSourceOperatorFactory extends LuceneOperator.Factor
         int maxPageSize,
         int limit
     ) {
-        super(contexts, weightFunction(queryFunction, ScoreMode.COMPLETE_NO_SCORES), DataPartitioning.SHARD, taskConcurrency, limit, false);
+        super(
+            contexts,
+            queryFunction,
+            DataPartitioning.SHARD,
+            query -> { throw new UnsupportedOperationException("locked to SHARD partitioning"); },
+            taskConcurrency,
+            limit,
+            false,
+            ScoreMode.COMPLETE_NO_SCORES
+        );
         this.maxPageSize = maxPageSize;
     }
 

+ 1 - 1
x-pack/plugin/esql/compute/src/test/java/org/elasticsearch/compute/lucene/LuceneCountOperatorTests.java

@@ -100,7 +100,7 @@ public class LuceneCountOperatorTests extends AnyOperatorTestCase {
 
     @Override
     protected Matcher<String> expectedDescriptionOfSimple() {
-        return matchesRegex("LuceneCountOperator\\[dataPartitioning = (DOC|SHARD|SEGMENT), limit = 100]");
+        return matchesRegex("LuceneCountOperator\\[dataPartitioning = (AUTO|DOC|SHARD|SEGMENT), limit = 100]");
     }
 
     // TODO tests for the other data partitioning configurations

+ 1 - 1
x-pack/plugin/esql/compute/src/test/java/org/elasticsearch/compute/lucene/LuceneMaxOperatorTestCase.java

@@ -203,7 +203,7 @@ public abstract class LuceneMaxOperatorTestCase extends AnyOperatorTestCase {
         return matchesRegex(
             "LuceneMaxOperator\\[type = "
                 + getNumberType().name()
-                + ", dataPartitioning = (DOC|SHARD|SEGMENT), fieldName = "
+                + ", dataPartitioning = (AUTO|DOC|SHARD|SEGMENT), fieldName = "
                 + FIELD_NAME
                 + ", limit = 100]"
         );

+ 1 - 1
x-pack/plugin/esql/compute/src/test/java/org/elasticsearch/compute/lucene/LuceneMinOperatorTestCase.java

@@ -203,7 +203,7 @@ public abstract class LuceneMinOperatorTestCase extends AnyOperatorTestCase {
         return matchesRegex(
             "LuceneMinOperator\\[type = "
                 + getNumberType().name()
-                + ", dataPartitioning = (DOC|SHARD|SEGMENT), fieldName = "
+                + ", dataPartitioning = (AUTO|DOC|SHARD|SEGMENT), fieldName = "
                 + FIELD_NAME
                 + ", limit = 100]"
         );

+ 127 - 0
x-pack/plugin/esql/compute/src/test/java/org/elasticsearch/compute/lucene/LuceneSourceOperatorAutoStrategyTests.java

@@ -0,0 +1,127 @@
+/*
+ * Copyright Elasticsearch B.V. and/or licensed to Elasticsearch B.V. under one
+ * or more contributor license agreements. Licensed under the Elastic License
+ * 2.0; you may not use this file except in compliance with the Elastic License
+ * 2.0.
+ */
+
+package org.elasticsearch.compute.lucene;
+
+import com.carrotsearch.randomizedtesting.annotations.ParametersFactory;
+
+import org.apache.lucene.document.LongPoint;
+import org.apache.lucene.document.SortedNumericDocValuesField;
+import org.apache.lucene.index.Term;
+import org.apache.lucene.search.BooleanClause;
+import org.apache.lucene.search.BooleanQuery;
+import org.apache.lucene.search.BoostQuery;
+import org.apache.lucene.search.ConstantScoreQuery;
+import org.apache.lucene.search.IndexOrDocValuesQuery;
+import org.apache.lucene.search.MatchAllDocsQuery;
+import org.apache.lucene.search.MatchNoDocsQuery;
+import org.apache.lucene.search.Query;
+import org.apache.lucene.search.TermQuery;
+import org.elasticsearch.test.ESTestCase;
+
+import java.util.List;
+import java.util.function.Function;
+
+import static org.hamcrest.Matchers.equalTo;
+
+/**
+ * Tests for the {@link LuceneSourceOperator.Factory#autoStrategy} method. It picks
+ * the strategy based on complex rules around the query.
+ */
+public class LuceneSourceOperatorAutoStrategyTests extends ESTestCase {
+    @ParametersFactory(argumentFormatting = "%s -> %s")
+    public static Iterable<Object[]> parameters() {
+        return List.of(
+            new Object[] { new MatchAllDocsQuery(), LuceneSliceQueue.PartitioningStrategy.DOC },
+            new Object[] { new ConstantScoreQuery(new MatchAllDocsQuery()), LuceneSliceQueue.PartitioningStrategy.DOC },
+
+            /*
+             * FROM test | WHERE @timestamp > \"2025-01-01T00:00:00Z\" | ....
+             * when all @timestamps are in range
+             */
+            new Object[] {
+                new BoostQuery(new ConstantScoreQuery(new MatchAllDocsQuery()), 0.0F),
+                LuceneSliceQueue.PartitioningStrategy.DOC },
+            new Object[] { new MatchNoDocsQuery(), LuceneSliceQueue.PartitioningStrategy.SHARD },
+
+            /*
+             * FROM test | WHERE @timestamp > \"2025-01-01T00:00:00Z\" | STATS SUM(b)
+             * when all @timestamps are in range and all docs have b.
+             */
+            new Object[] {
+                new BooleanQuery.Builder() // formatter
+                    .add(new BoostQuery(new ConstantScoreQuery(new MatchAllDocsQuery()), 0.0F), BooleanClause.Occur.MUST)
+                    .add(new BoostQuery(new ConstantScoreQuery(new MatchAllDocsQuery()), 0.0F), BooleanClause.Occur.MUST)
+                    .build(),
+                LuceneSliceQueue.PartitioningStrategy.DOC },
+
+            /*
+             * FROM test | WHERE @timestamp > \"2025-01-01T00:00:00.120Z\" | STATS SUM(b)
+             * when *some* @timestamps are in range and all docs have b
+             */
+            new Object[] {
+                new BooleanQuery.Builder().add(
+                    new BoostQuery(
+                        new ConstantScoreQuery(
+                            new IndexOrDocValuesQuery(
+                                LongPoint.newRangeQuery("@timestamp", 1735689600121L, 9223372036854775807L),
+                                SortedNumericDocValuesField.newSlowRangeQuery("@timestamp", 1735689600121L, 9223372036854775807L)
+                            )
+                        ),
+                        0.0F
+                    ),
+                    BooleanClause.Occur.MUST
+                ).add(new BoostQuery(new ConstantScoreQuery(new MatchAllDocsQuery()), 0.0F), BooleanClause.Occur.MUST).build(),
+                LuceneSliceQueue.PartitioningStrategy.SEGMENT },
+
+            new Object[] {
+                new BooleanQuery.Builder() // formatter
+                    .add(new MatchAllDocsQuery(), BooleanClause.Occur.MUST)
+                    .add(new TermQuery(new Term("a", "a")), BooleanClause.Occur.SHOULD)
+                    .build(),
+                LuceneSliceQueue.PartitioningStrategy.SEGMENT },
+            new Object[] {
+                new BooleanQuery.Builder() // formatter
+                    .add(new TermQuery(new Term("a", "a")), BooleanClause.Occur.SHOULD)
+                    .add(new MatchAllDocsQuery(), BooleanClause.Occur.MUST_NOT)
+                    .build(),
+                LuceneSliceQueue.PartitioningStrategy.SHARD },
+            new Object[] {
+                new BooleanQuery.Builder() // formatter
+                    .add(new MatchAllDocsQuery(), BooleanClause.Occur.SHOULD)
+                    .add(new TermQuery(new Term("a", "a")), BooleanClause.Occur.SHOULD)
+                    .build(),
+                LuceneSliceQueue.PartitioningStrategy.SEGMENT },
+            new Object[] {
+                new BooleanQuery.Builder() // formatter
+                    .add(new MatchNoDocsQuery(), BooleanClause.Occur.SHOULD)
+                    .add(new TermQuery(new Term("a", "a")), BooleanClause.Occur.SHOULD)
+                    .build(),
+                LuceneSliceQueue.PartitioningStrategy.SEGMENT }
+        );
+    }
+
+    private final Query query;
+    private final LuceneSliceQueue.PartitioningStrategy expectedUnlimited;
+
+    public LuceneSourceOperatorAutoStrategyTests(Query query, LuceneSliceQueue.PartitioningStrategy expectedUnlimited) {
+        this.query = query;
+        this.expectedUnlimited = expectedUnlimited;
+    }
+
+    public void testAutoStrategyLimited() {
+        Function<Query, LuceneSliceQueue.PartitioningStrategy> auto = LuceneSourceOperator.Factory.autoStrategy(
+            between(1, LuceneOperator.NO_LIMIT - 1)
+        );
+        assertThat(auto.apply(query), equalTo(LuceneSliceQueue.PartitioningStrategy.SHARD));
+    }
+
+    public void testAutoStrategyUnlimited() {
+        Function<Query, LuceneSliceQueue.PartitioningStrategy> auto = LuceneSourceOperator.Factory.autoStrategy(LuceneOperator.NO_LIMIT);
+        assertThat(auto.apply(query), equalTo(expectedUnlimited));
+    }
+}

+ 31 - 5
x-pack/plugin/esql/compute/src/test/java/org/elasticsearch/compute/lucene/LuceneSourceOperatorStatusTests.java

@@ -12,7 +12,9 @@ import org.elasticsearch.common.io.stream.Writeable;
 import org.elasticsearch.test.AbstractWireSerializingTestCase;
 import org.elasticsearch.test.ESTestCase;
 
+import java.util.HashMap;
 import java.util.List;
+import java.util.Map;
 import java.util.Set;
 import java.util.TreeSet;
 
@@ -31,7 +33,8 @@ public class LuceneSourceOperatorStatusTests extends AbstractWireSerializingTest
             123,
             99990,
             8000,
-            222
+            222,
+            Map.of("b:0", LuceneSliceQueue.PartitioningStrategy.SHARD, "a:1", LuceneSliceQueue.PartitioningStrategy.DOC)
         );
     }
 
@@ -54,7 +57,11 @@ public class LuceneSourceOperatorStatusTests extends AbstractWireSerializingTest
               "slice_min" : 123,
               "slice_max" : 99990,
               "current" : 8000,
-              "rows_emitted" : 222
+              "rows_emitted" : 222,
+              "partitioning_strategies" : {
+                "a:1" : "DOC",
+                "b:0" : "SHARD"
+              }
             }""";
     }
 
@@ -80,7 +87,8 @@ public class LuceneSourceOperatorStatusTests extends AbstractWireSerializingTest
             randomNonNegativeInt(),
             randomNonNegativeInt(),
             randomNonNegativeInt(),
-            randomNonNegativeLong()
+            randomNonNegativeLong(),
+            randomPartitioningStrategies()
         );
     }
 
@@ -102,6 +110,18 @@ public class LuceneSourceOperatorStatusTests extends AbstractWireSerializingTest
         return set;
     }
 
+    private static Map<String, LuceneSliceQueue.PartitioningStrategy> randomPartitioningStrategies() {
+        int size = between(0, 10);
+        Map<String, LuceneSliceQueue.PartitioningStrategy> partitioningStrategies = new HashMap<>();
+        while (partitioningStrategies.size() < size) {
+            partitioningStrategies.put(
+                randomAlphaOfLength(3) + ":" + between(0, 10),
+                randomFrom(LuceneSliceQueue.PartitioningStrategy.values())
+            );
+        }
+        return partitioningStrategies;
+    }
+
     @Override
     protected LuceneSourceOperator.Status mutateInstance(LuceneSourceOperator.Status instance) {
         int processedSlices = instance.processedSlices();
@@ -115,7 +135,8 @@ public class LuceneSourceOperatorStatusTests extends AbstractWireSerializingTest
         int sliceMax = instance.sliceMax();
         int current = instance.current();
         long rowsEmitted = instance.rowsEmitted();
-        switch (between(0, 10)) {
+        Map<String, LuceneSliceQueue.PartitioningStrategy> partitioningStrategies = instance.partitioningStrategies();
+        switch (between(0, 11)) {
             case 0 -> processedSlices = randomValueOtherThan(processedSlices, ESTestCase::randomNonNegativeInt);
             case 1 -> processedQueries = randomValueOtherThan(processedQueries, LuceneSourceOperatorStatusTests::randomProcessedQueries);
             case 2 -> processedShards = randomValueOtherThan(processedShards, LuceneSourceOperatorStatusTests::randomProcessedShards);
@@ -127,6 +148,10 @@ public class LuceneSourceOperatorStatusTests extends AbstractWireSerializingTest
             case 8 -> sliceMax = randomValueOtherThan(sliceMax, ESTestCase::randomNonNegativeInt);
             case 9 -> current = randomValueOtherThan(current, ESTestCase::randomNonNegativeInt);
             case 10 -> rowsEmitted = randomValueOtherThan(rowsEmitted, ESTestCase::randomNonNegativeLong);
+            case 11 -> partitioningStrategies = randomValueOtherThan(
+                partitioningStrategies,
+                LuceneSourceOperatorStatusTests::randomPartitioningStrategies
+            );
             default -> throw new UnsupportedOperationException();
         }
         return new LuceneSourceOperator.Status(
@@ -140,7 +165,8 @@ public class LuceneSourceOperatorStatusTests extends AbstractWireSerializingTest
             sliceMin,
             sliceMax,
             current,
-            rowsEmitted
+            rowsEmitted,
+            partitioningStrategies
         );
     }
 }

+ 43 - 20
x-pack/plugin/esql/compute/src/test/java/org/elasticsearch/compute/lucene/LuceneSourceOperatorTests.java

@@ -119,17 +119,34 @@ public class LuceneSourceOperatorTests extends AnyOperatorTestCase {
     @Override
     protected Matcher<String> expectedDescriptionOfSimple() {
         return matchesRegex(
-            "LuceneSourceOperator"
-                + "\\[dataPartitioning = (DOC|SHARD|SEGMENT), maxPageSize = \\d+, limit = 100, needsScore = (true|false)]"
+            "LuceneSourceOperator\\["
+                + "dataPartitioning = (AUTO|DOC|SHARD|SEGMENT), "
+                + "maxPageSize = \\d+, "
+                + "limit = 100, "
+                + "needsScore = (true|false)]"
         );
     }
 
-    // TODO tests for the other data partitioning configurations
+    public void testAutoPartitioning() {
+        testSimple(DataPartitioning.AUTO);
+    }
+
+    public void testShardPartitioning() {
+        testSimple(DataPartitioning.SHARD);
+    }
 
-    public void testShardDataPartitioning() {
+    public void testSegmentPartitioning() {
+        testSimple(DataPartitioning.SEGMENT);
+    }
+
+    public void testDocPartitioning() {
+        testSimple(DataPartitioning.DOC);
+    }
+
+    private void testSimple(DataPartitioning partitioning) {
         int size = between(1_000, 20_000);
         int limit = between(10, size);
-        testSimple(driverContext(), size, limit);
+        testSimple(driverContext(), partitioning, size, limit);
     }
 
     public void testEarlyTermination() {
@@ -168,12 +185,12 @@ public class LuceneSourceOperatorTests extends AnyOperatorTestCase {
     }
 
     public void testEmpty() {
-        testSimple(driverContext(), 0, between(10, 10_000));
+        testSimple(driverContext(), randomFrom(DataPartitioning.values()), 0, between(10, 10_000));
     }
 
-    public void testWithCranky() {
+    public void testEmptyWithCranky() {
         try {
-            testSimple(crankyDriverContext(), between(1, 10_000), 100);
+            testSimple(crankyDriverContext(), randomFrom(DataPartitioning.values()), 0, between(10, 10_000));
             logger.info("cranky didn't break");
         } catch (CircuitBreakingException e) {
             logger.info("broken", e);
@@ -181,21 +198,27 @@ public class LuceneSourceOperatorTests extends AnyOperatorTestCase {
         }
     }
 
-    public void testEmptyWithCranky() {
-        try {
-            testSimple(crankyDriverContext(), 0, between(10, 10_000));
-            logger.info("cranky didn't break");
-        } catch (CircuitBreakingException e) {
-            logger.info("broken", e);
-            assertThat(e.getMessage(), equalTo(CrankyCircuitBreakerService.ERROR_MESSAGE));
-        }
+    public void testAutoPartitioningWithCranky() {
+        testWithCranky(DataPartitioning.AUTO);
+    }
+
+    public void testShardPartitioningWithCranky() {
+        testWithCranky(DataPartitioning.SHARD);
+    }
+
+    public void testSegmentPartitioningWithCranky() {
+        testWithCranky(DataPartitioning.SEGMENT);
+    }
+
+    public void testDocPartitioningWithCranky() {
+        testWithCranky(DataPartitioning.DOC);
     }
 
-    public void testShardDataPartitioningWithCranky() {
+    private void testWithCranky(DataPartitioning partitioning) {
         int size = between(1_000, 20_000);
         int limit = between(10, size);
         try {
-            testSimple(crankyDriverContext(), size, limit);
+            testSimple(crankyDriverContext(), partitioning, size, limit);
             logger.info("cranky didn't break");
         } catch (CircuitBreakingException e) {
             logger.info("broken", e);
@@ -203,8 +226,8 @@ public class LuceneSourceOperatorTests extends AnyOperatorTestCase {
         }
     }
 
-    private void testSimple(DriverContext ctx, int size, int limit) {
-        LuceneSourceOperator.Factory factory = simple(DataPartitioning.SHARD, size, limit, scoring);
+    private void testSimple(DriverContext ctx, DataPartitioning partitioning, int size, int limit) {
+        LuceneSourceOperator.Factory factory = simple(partitioning, size, limit, scoring);
         Operator.OperatorFactory readS = ValuesSourceReaderOperatorTests.factory(reader, S_FIELD, ElementType.LONG);
 
         List<Page> results = new ArrayList<>();

+ 229 - 0
x-pack/plugin/esql/qa/server/single-node/src/javaRestTest/java/org/elasticsearch/xpack/esql/qa/single_node/EsqlPartitioningIT.java

@@ -0,0 +1,229 @@
+/*
+ * Copyright Elasticsearch B.V. and/or licensed to Elasticsearch B.V. under one
+ * or more contributor license agreements. Licensed under the Elastic License
+ * 2.0; you may not use this file except in compliance with the Elastic License
+ * 2.0.
+ */
+
+package org.elasticsearch.xpack.esql.qa.single_node;
+
+import com.carrotsearch.randomizedtesting.annotations.ParametersFactory;
+import com.carrotsearch.randomizedtesting.annotations.ThreadLeakFilters;
+
+import org.apache.http.util.EntityUtils;
+import org.elasticsearch.client.Request;
+import org.elasticsearch.client.Response;
+import org.elasticsearch.common.Strings;
+import org.elasticsearch.common.unit.ByteSizeValue;
+import org.elasticsearch.common.xcontent.XContentHelper;
+import org.elasticsearch.index.mapper.DateFieldMapper;
+import org.elasticsearch.test.TestClustersThreadFilter;
+import org.elasticsearch.test.cluster.ElasticsearchCluster;
+import org.elasticsearch.test.rest.ESRestTestCase;
+import org.elasticsearch.xcontent.XContentBuilder;
+import org.elasticsearch.xcontent.json.JsonXContent;
+import org.hamcrest.Matcher;
+import org.junit.ClassRule;
+
+import java.io.IOException;
+import java.util.ArrayList;
+import java.util.List;
+import java.util.Locale;
+import java.util.Map;
+
+import static org.hamcrest.Matchers.equalTo;
+
+@ThreadLeakFilters(filters = TestClustersThreadFilter.class)
+public class EsqlPartitioningIT extends ESRestTestCase {
+    /**
+     * The size of the large index we use to test partitioning. It has to be
+     * at least 250,000 entries for AUTO partitioning to do interesting things.
+     */
+    private static final int IDX_DOCS = 300_000;
+    /**
+     * The size of the small index we use to test partitioning. It's small enough
+     * that AUTO should always choose SHARD partitioning.
+     */
+    private static final int SMALL_IDX_DOCS = 20_000;
+
+    private static final long BASE_TIME = DateFieldMapper.DEFAULT_DATE_TIME_FORMATTER.parseMillis("2025-01-01T00:00:00Z");
+    private static final int BULK_CHARS = Math.toIntExact(ByteSizeValue.ofMb(1).getBytes());
+
+    record Case(String suffix, String idxPartition) {}
+
+    @ParametersFactory(argumentFormatting = "[%1$s] %3$s -> %4$s")
+    public static Iterable<Object[]> parameters() {
+        List<Object[]> params = new ArrayList<>();
+        for (String defaultDataPartitioning : new String[] { null, "shard", "segment", "doc", "auto" }) {
+            for (String index : new String[] { "idx", "small_idx" }) {
+                for (Case c : new Case[] {
+                    new Case("", "SHARD"),
+                    new Case("| SORT @timestamp ASC", "SHARD"),
+                    new Case("| WHERE ABS(a) == 1", "DOC"),
+                    new Case("| WHERE a == 1", "SHARD"),
+                    new Case("| STATS SUM(a)", "DOC"),
+                    new Case("| MV_EXPAND a | STATS SUM(a)", "DOC"),
+                    new Case("| WHERE a == 1 | STATS SUM(a)", "SEGMENT"),
+                    new Case("| WHERE a == 1 | MV_EXPAND a | STATS SUM(a)", "SEGMENT"), }) {
+                    params.add(
+                        new Object[] {
+                            defaultDataPartitioning,
+                            index,
+                            "FROM " + index + " " + c.suffix,
+                            expectedPartition(defaultDataPartitioning, index, c.idxPartition) }
+                    );
+                }
+            }
+        }
+        return params;
+    }
+
+    @ClassRule
+    public static ElasticsearchCluster cluster = Clusters.testCluster();
+
+    private final String defaultDataPartitioning;
+    private final String index;
+    private final String query;
+    private final Matcher<String> expectedPartition;
+
+    public EsqlPartitioningIT(String defaultDataPartitioning, String index, String query, Matcher<String> expectedPartition) {
+        this.defaultDataPartitioning = defaultDataPartitioning;
+        this.index = index;
+        this.query = query;
+        this.expectedPartition = expectedPartition;
+    }
+
+    public void test() throws IOException {
+        setupIndex(index, switch (index) {
+            case "idx" -> IDX_DOCS;
+            case "small_idx" -> SMALL_IDX_DOCS;
+            default -> throw new IllegalArgumentException("unknown index [" + index + "]");
+        });
+        setDefaultDataPartitioning(defaultDataPartitioning);
+        try {
+            assertThat(partitionForQuery(query), expectedPartition);
+        } finally {
+            setDefaultDataPartitioning(null);
+        }
+    }
+
+    private void setDefaultDataPartitioning(String defaultDataPartitioning) throws IOException {
+        Request request = new Request("PUT", "/_cluster/settings");
+        XContentBuilder builder = JsonXContent.contentBuilder().startObject();
+        builder.startObject("transient");
+        builder.field("esql.default_data_partitioning", defaultDataPartitioning);
+        builder.endObject();
+        request.setJsonEntity(Strings.toString(builder.endObject()));
+        int code = client().performRequest(request).getStatusLine().getStatusCode();
+        assertThat(code, equalTo(200));
+    }
+
+    private static Matcher<String> expectedPartition(String defaultDataPartitioning, String index, String idxPartition) {
+        return switch (defaultDataPartitioning) {
+            case null -> expectedAutoPartition(index, idxPartition);
+            case "auto" -> expectedAutoPartition(index, idxPartition);
+            default -> equalTo(defaultDataPartitioning.toUpperCase(Locale.ROOT));
+        };
+    }
+
+    private static Matcher<String> expectedAutoPartition(String index, String idxPartition) {
+        return equalTo(switch (index) {
+            case "idx" -> idxPartition;
+            case "small_idx" -> "SHARD";
+            default -> throw new UnsupportedOperationException("unknown index [" + index + "]");
+        });
+    }
+
+    private String partitionForQuery(String query) throws IOException {
+        Request request = new Request("POST", "_query");
+        request.addParameter("pretty", "");
+        request.addParameter("error_trace", "");
+        XContentBuilder b = JsonXContent.contentBuilder().startObject();
+        b.field("query", query);
+        b.field("profile", true);
+        request.setJsonEntity(Strings.toString(b.endObject()));
+        request.setOptions(request.getOptions().toBuilder().setWarningsHandler(w -> {
+            w.remove("No limit defined, adding default limit of [1000]");
+            return w.isEmpty() == false;
+        }));
+        String response = EntityUtils.toString(client().performRequest(request).getEntity());
+        logger.info("Response: {}", response);
+        Map<?, ?> profile = (Map<?, ?>) XContentHelper.convertToMap(JsonXContent.jsonXContent, response, true).get("profile");
+        List<?> drivers = (List<?>) profile.get("drivers");
+        for (Object dItem : drivers) {
+            Map<?, ?> d = (Map<?, ?>) dItem;
+            if (false == "data".equals(d.get("description"))) {
+                continue;
+            }
+            List<?> operators = (List<?>) d.get("operators");
+            for (Object oItem : operators) {
+                Map<?, ?> o = (Map<?, ?>) oItem;
+                String operator = o.get("operator").toString();
+                if (false == operator.startsWith("Lucene")) {
+                    continue;
+                }
+                Map<?, ?> status = (Map<?, ?>) o.get("status");
+                Map<?, ?> partitioningStrategies = (Map<?, ?>) status.get("partitioning_strategies");
+                String strat = (String) partitioningStrategies.get(index + ":0");
+                if (strat == null) {
+                    throw new IllegalArgumentException("didn't find partition strategy for: " + o);
+                }
+                return strat;
+            }
+        }
+        throw new IllegalArgumentException("didn't find partition strategy for: " + drivers);
+    }
+
+    private void setupIndex(String index, int docs) throws IOException {
+        Response exists = client().performRequest(new Request("HEAD", "/" + index));
+        if (exists.getStatusLine().getStatusCode() == 200) {
+            return;
+        }
+        Request create = new Request("PUT", index);
+        create.setJsonEntity("""
+            {
+              "settings": {
+                "index": {
+                  "number_of_shards": 1
+                }
+              }
+            }""");  // Use a single shard to get consistent results.
+        client().performRequest(create);
+        StringBuilder bulk = new StringBuilder();
+        for (int d = 0; d < docs; d++) {
+            bulk.append("{\"index\":{}}\n");
+            bulk.append("{\"@timestamp\": \"");
+            bulk.append(DateFieldMapper.DEFAULT_DATE_TIME_FORMATTER.formatMillis(BASE_TIME + d));
+            bulk.append("\", \"a\": ");
+            bulk.append(d % 10);
+            bulk.append("}\n");
+            if (bulk.length() > BULK_CHARS) {
+                logger.info("indexing {}: {}", index, d);
+                bulk(index, bulk.toString());
+                bulk.setLength(0);
+            }
+        }
+        logger.info("indexing {}: {}", index, docs);
+        bulk(index, bulk.toString());
+        client().performRequest(new Request("POST", "/" + index + "/_refresh"));
+    }
+
+    private void bulk(String index, String bulk) throws IOException {
+        Request request = new Request("POST", index + "/_bulk");
+        request.setJsonEntity(bulk);
+        Response response = client().performRequest(request);
+        if (response.getStatusLine().getStatusCode() != 200) {
+            throw new AssertionError("error with bulk: " + EntityUtils.toString(response.getEntity()));
+        }
+    }
+
+    @Override
+    protected String getTestRestCluster() {
+        return cluster.getHttpAddresses();
+    }
+
+    @Override
+    protected boolean preserveClusterUponCompletion() {
+        return true;
+    }
+}

+ 2 - 1
x-pack/plugin/esql/qa/server/single-node/src/javaRestTest/java/org/elasticsearch/xpack/esql/qa/single_node/RestEsqlIT.java

@@ -687,7 +687,8 @@ public class RestEsqlIT extends RestEsqlTestCase {
                 .entry("pages_emitted", greaterThan(0))
                 .entry("rows_emitted", greaterThan(0))
                 .entry("process_nanos", greaterThan(0))
-                .entry("processed_queries", List.of("*:*"));
+                .entry("processed_queries", List.of("*:*"))
+                .entry("partitioning_strategies", matchesMap().entry("rest-esql-test:0", "SHARD"));
             case "ValuesSourceReaderOperator" -> basicProfile().entry("readers_built", matchesMap().extraOk());
             case "AggregationOperator" -> matchesMap().entry("pages_processed", greaterThan(0))
                 .entry("rows_received", greaterThan(0))

+ 16 - 5
x-pack/plugin/esql/src/main/java/org/elasticsearch/xpack/esql/planner/EsPhysicalOperationProviders.java

@@ -17,6 +17,7 @@ import org.elasticsearch.common.logging.HeaderWarning;
 import org.elasticsearch.compute.aggregation.GroupingAggregator;
 import org.elasticsearch.compute.data.Block;
 import org.elasticsearch.compute.data.ElementType;
+import org.elasticsearch.compute.lucene.DataPartitioning;
 import org.elasticsearch.compute.lucene.LuceneCountOperator;
 import org.elasticsearch.compute.lucene.LuceneOperator;
 import org.elasticsearch.compute.lucene.LuceneSourceOperator;
@@ -101,10 +102,17 @@ public class EsPhysicalOperationProviders extends AbstractPhysicalOperationProvi
     }
 
     private final List<ShardContext> shardContexts;
+    private final DataPartitioning defaultDataPartitioning;
 
-    public EsPhysicalOperationProviders(FoldContext foldContext, List<ShardContext> shardContexts, AnalysisRegistry analysisRegistry) {
+    public EsPhysicalOperationProviders(
+        FoldContext foldContext,
+        List<ShardContext> shardContexts,
+        AnalysisRegistry analysisRegistry,
+        DataPartitioning defaultDataPartitioning
+    ) {
         super(foldContext, analysisRegistry);
         this.shardContexts = shardContexts;
+        this.defaultDataPartitioning = defaultDataPartitioning;
     }
 
     @Override
@@ -199,7 +207,7 @@ public class EsPhysicalOperationProviders extends AbstractPhysicalOperationProvi
             luceneFactory = new LuceneTopNSourceOperator.Factory(
                 shardContexts,
                 querySupplier(esQueryExec.query()),
-                context.queryPragmas().dataPartitioning(),
+                context.queryPragmas().dataPartitioning(defaultDataPartitioning),
                 context.queryPragmas().taskConcurrency(),
                 context.pageSize(rowEstimatedSize),
                 limit,
@@ -219,7 +227,7 @@ public class EsPhysicalOperationProviders extends AbstractPhysicalOperationProvi
                 luceneFactory = new LuceneSourceOperator.Factory(
                     shardContexts,
                     querySupplier(esQueryExec.query()),
-                    context.queryPragmas().dataPartitioning(),
+                    context.queryPragmas().dataPartitioning(defaultDataPartitioning),
                     context.queryPragmas().taskConcurrency(),
                     context.pageSize(rowEstimatedSize),
                     limit,
@@ -241,7 +249,7 @@ public class EsPhysicalOperationProviders extends AbstractPhysicalOperationProvi
         return new LuceneCountOperator.Factory(
             shardContexts,
             querySupplier(queryBuilder),
-            context.queryPragmas().dataPartitioning(),
+            context.queryPragmas().dataPartitioning(defaultDataPartitioning),
             context.queryPragmas().taskConcurrency(),
             limit == null ? NO_LIMIT : (Integer) limit.fold(context.foldCtx())
         );
@@ -278,11 +286,14 @@ public class EsPhysicalOperationProviders extends AbstractPhysicalOperationProvi
         private final int index;
         private final SearchExecutionContext ctx;
         private final AliasFilter aliasFilter;
+        private final String shardIdentifier;
 
         public DefaultShardContext(int index, SearchExecutionContext ctx, AliasFilter aliasFilter) {
             this.index = index;
             this.ctx = ctx;
             this.aliasFilter = aliasFilter;
+            // Build the shardIdentifier once up front so we can reuse references to it in many places.
+            this.shardIdentifier = ctx.getFullyQualifiedIndex().getName() + ":" + ctx.getShardId();
         }
 
         @Override
@@ -302,7 +313,7 @@ public class EsPhysicalOperationProviders extends AbstractPhysicalOperationProvi
 
         @Override
         public String shardIdentifier() {
-            return ctx.getFullyQualifiedIndex().getName() + ":" + ctx.getShardId();
+            return shardIdentifier;
         }
 
         @Override

+ 10 - 1
x-pack/plugin/esql/src/main/java/org/elasticsearch/xpack/esql/plugin/ComputeService.java

@@ -17,6 +17,7 @@ import org.elasticsearch.common.util.BigArrays;
 import org.elasticsearch.common.util.concurrent.RunOnce;
 import org.elasticsearch.compute.data.BlockFactory;
 import org.elasticsearch.compute.data.Page;
+import org.elasticsearch.compute.lucene.DataPartitioning;
 import org.elasticsearch.compute.operator.Driver;
 import org.elasticsearch.compute.operator.DriverProfile;
 import org.elasticsearch.compute.operator.DriverTaskRunner;
@@ -133,6 +134,8 @@ public class ComputeService {
     private final ClusterComputeHandler clusterComputeHandler;
     private final ExchangeService exchangeService;
 
+    private volatile DataPartitioning defaultDataPartitioning;
+
     @SuppressWarnings("this-escape")
     public ComputeService(
         TransportActionServices transportActionServices,
@@ -161,6 +164,7 @@ public class ComputeService {
             esqlExecutor,
             dataNodeComputeHandler
         );
+        clusterService.getClusterSettings().initializeAndWatch(EsqlPlugin.DEFAULT_DATA_PARTITIONING, v -> this.defaultDataPartitioning = v);
     }
 
     public void execute(
@@ -429,7 +433,12 @@ public class ComputeService {
                 enrichLookupService,
                 lookupFromIndexService,
                 inferenceRunner,
-                new EsPhysicalOperationProviders(context.foldCtx(), contexts, searchService.getIndicesService().getAnalysis()),
+                new EsPhysicalOperationProviders(
+                    context.foldCtx(),
+                    contexts,
+                    searchService.getIndicesService().getAnalysis(),
+                    defaultDataPartitioning
+                ),
                 contexts
             );
 

+ 11 - 1
x-pack/plugin/esql/src/main/java/org/elasticsearch/xpack/esql/plugin/EsqlPlugin.java

@@ -21,6 +21,7 @@ import org.elasticsearch.common.util.FeatureFlag;
 import org.elasticsearch.common.util.concurrent.EsExecutors;
 import org.elasticsearch.compute.data.BlockFactory;
 import org.elasticsearch.compute.data.BlockFactoryProvider;
+import org.elasticsearch.compute.lucene.DataPartitioning;
 import org.elasticsearch.compute.lucene.LuceneOperator;
 import org.elasticsearch.compute.lucene.ValuesSourceReaderOperator;
 import org.elasticsearch.compute.operator.AbstractPageMappingOperator;
@@ -154,6 +155,14 @@ public class EsqlPlugin extends Plugin implements ActionPlugin {
         Setting.Property.Dynamic
     );
 
+    public static final Setting<DataPartitioning> DEFAULT_DATA_PARTITIONING = Setting.enumSetting(
+        DataPartitioning.class,
+        "esql.default_data_partitioning",
+        DataPartitioning.AUTO,
+        Setting.Property.NodeScope,
+        Setting.Property.Dynamic
+    );
+
     @Override
     public Collection<?> createComponents(PluginServices services) {
         CircuitBreaker circuitBreaker = services.indicesService().getBigArrays().breakerService().getBreaker("request");
@@ -216,7 +225,8 @@ public class EsqlPlugin extends Plugin implements ActionPlugin {
             ESQL_QUERYLOG_THRESHOLD_DEBUG_SETTING,
             ESQL_QUERYLOG_THRESHOLD_INFO_SETTING,
             ESQL_QUERYLOG_THRESHOLD_WARN_SETTING,
-            ESQL_QUERYLOG_INCLUDE_USER_SETTING
+            ESQL_QUERYLOG_INCLUDE_USER_SETTING,
+            DEFAULT_DATA_PARTITIONING
         );
     }
 

+ 16 - 7
x-pack/plugin/esql/src/main/java/org/elasticsearch/xpack/esql/plugin/QueryPragmas.java

@@ -15,6 +15,7 @@ import org.elasticsearch.common.settings.Settings;
 import org.elasticsearch.common.unit.ByteSizeValue;
 import org.elasticsearch.common.util.concurrent.EsExecutors;
 import org.elasticsearch.compute.lucene.DataPartitioning;
+import org.elasticsearch.compute.lucene.LuceneSliceQueue;
 import org.elasticsearch.compute.operator.Driver;
 import org.elasticsearch.compute.operator.DriverStatus;
 import org.elasticsearch.core.TimeValue;
@@ -23,6 +24,7 @@ import org.elasticsearch.threadpool.ThreadPool;
 import org.elasticsearch.xpack.esql.core.expression.Expression;
 
 import java.io.IOException;
+import java.util.Locale;
 import java.util.Objects;
 
 /**
@@ -38,11 +40,14 @@ public final class QueryPragmas implements Writeable {
         ThreadPool.searchOrGetThreadPoolSize(EsExecutors.allocatedProcessors(Settings.EMPTY))
     );
 
-    public static final Setting<DataPartitioning> DATA_PARTITIONING = Setting.enumSetting(
-        DataPartitioning.class,
-        "data_partitioning",
-        DataPartitioning.SEGMENT
-    );
+    /**
+     * How to cut {@link LuceneSliceQueue slices} to cut each shard into. Is parsed to
+     * the enum {@link DataPartitioning} which has more documentation. Not an
+     * {@link Setting#enumSetting} because those can't have {@code null} defaults.
+     * {@code null} here means "use the default from the cluster setting
+     * named {@link EsqlPlugin#DEFAULT_DATA_PARTITIONING}."
+     */
+    public static final Setting<String> DATA_PARTITIONING = Setting.simpleString("data_partitioning");
 
     /**
      * Size of a page in entries with {@code 0} being a special value asking
@@ -100,8 +105,12 @@ public final class QueryPragmas implements Writeable {
         return EXCHANGE_CONCURRENT_CLIENTS.get(settings);
     }
 
-    public DataPartitioning dataPartitioning() {
-        return DATA_PARTITIONING.get(settings);
+    public DataPartitioning dataPartitioning(DataPartitioning defaultDataPartitioning) {
+        String partitioning = DATA_PARTITIONING.get(settings);
+        if (partitioning.isEmpty()) {
+            return defaultDataPartitioning;
+        }
+        return DataPartitioning.valueOf(partitioning.toUpperCase(Locale.ROOT));
     }
 
     public int taskConcurrency() {

+ 2 - 1
x-pack/plugin/esql/src/test/java/org/elasticsearch/xpack/esql/optimizer/PhysicalPlanOptimizerTests.java

@@ -17,6 +17,7 @@ import org.elasticsearch.common.settings.Settings;
 import org.elasticsearch.common.util.BigArrays;
 import org.elasticsearch.common.util.set.Sets;
 import org.elasticsearch.compute.aggregation.AggregatorMode;
+import org.elasticsearch.compute.lucene.DataPartitioning;
 import org.elasticsearch.compute.operator.exchange.ExchangeSinkHandler;
 import org.elasticsearch.compute.operator.exchange.ExchangeSourceHandler;
 import org.elasticsearch.compute.test.TestBlockFactory;
@@ -7675,7 +7676,7 @@ public class PhysicalPlanOptimizerTests extends ESTestCase {
             null,
             null,
             null,
-            new EsPhysicalOperationProviders(FoldContext.small(), List.of(), null),
+            new EsPhysicalOperationProviders(FoldContext.small(), List.of(), null, DataPartitioning.AUTO),
             List.of()
         );
 

+ 2 - 1
x-pack/plugin/esql/src/test/java/org/elasticsearch/xpack/esql/planner/LocalExecutionPlannerTests.java

@@ -20,6 +20,7 @@ import org.apache.lucene.tests.index.RandomIndexWriter;
 import org.elasticsearch.cluster.ClusterName;
 import org.elasticsearch.common.settings.Settings;
 import org.elasticsearch.common.util.BigArrays;
+import org.elasticsearch.compute.lucene.DataPartitioning;
 import org.elasticsearch.compute.lucene.LuceneSourceOperator;
 import org.elasticsearch.compute.lucene.LuceneTopNSourceOperator;
 import org.elasticsearch.compute.operator.SourceOperator;
@@ -256,7 +257,7 @@ public class LocalExecutionPlannerTests extends MapperServiceTestCase {
     }
 
     private EsPhysicalOperationProviders esPhysicalOperationProviders(List<EsPhysicalOperationProviders.ShardContext> shardContexts) {
-        return new EsPhysicalOperationProviders(FoldContext.small(), shardContexts, null);
+        return new EsPhysicalOperationProviders(FoldContext.small(), shardContexts, null, DataPartitioning.AUTO);
     }
 
     private List<EsPhysicalOperationProviders.ShardContext> createShardContexts() throws IOException {