فهرست منبع

ESQL: Speed loading stored fields (#127348) (#127721)

This speeds up loading from stored fields by opting more blocks into the
"sequential" strategy. This really kicks in when loading stored fields
like `text`. And when you need less than 100% of documents, but more than,
say, 10%. This is most useful when you need 99.9% of field documents.
That sort of thing. Here's the perf numbers:
```
%100.0 {"took": 403 -> 401,"documents_found":1000000}
%099.9 {"took":3990 -> 436,"documents_found": 999000}
%099.0 {"took":4069 -> 440,"documents_found": 990000}
%090.0 {"took":3468 -> 421,"documents_found": 900000}
%030.0 {"took":1213 -> 152,"documents_found": 300000}
%020.0 {"took": 766 -> 104,"documents_found": 200000}
%010.0 {"took": 397 ->  55,"documents_found": 100000}
%009.0 {"took": 352 -> 375,"documents_found":  90000}
%008.0 {"took": 304 -> 317,"documents_found":  80000}
%007.0 {"took": 273 -> 287,"documents_found":  70000}
%005.0 {"took": 199 -> 204,"documents_found":  50000}
%001.0 {"took":  46 ->  46,"documents_found":  10000}
```

Let's explain this with an example. First, jump to `main` and load a
million documents:
```
rm -f /tmp/bulk
for a in {1..1000}; do
    echo '{"index":{}}' >> /tmp/bulk
    echo '{"text":"text '$(printf %04d $a)'"}' >> /tmp/bulk
done

curl -s -uelastic:password -HContent-Type:application/json -XDELETE localhost:9200/test
for a in {1..1000}; do
    echo -n $a:
    curl -s -uelastic:password -HContent-Type:application/json -XPOST localhost:9200/test/_bulk?pretty --data-binary @/tmp/bulk | grep errors
done
curl -s -uelastic:password -HContent-Type:application/json -XPOST localhost:9200/test/_forcemerge?max_num_segments=1
curl -s -uelastic:password -HContent-Type:application/json -XPOST localhost:9200/test/_refresh
echo
```

Now query them all. Run this a few times until it's stable:
```
echo -n "%100.0 "
curl -s -uelastic:password -HContent-Type:application/json -XPOST 'localhost:9200/_query?pretty' -d'{
    "query": "FROM test | STATS SUM(LENGTH(text))",
    "pragma": {
        "data_partitioning": "shard"
    }
}' | jq -c '{took, documents_found}'
```

Now fetch 99.9% of documents:
```
echo -n "%099.9 "
curl -s -uelastic:password -HContent-Type:application/json -XPOST 'localhost:9200/_query?pretty' -d'{
    "query": "FROM test | WHERE NOT text.keyword IN (\"text 0998\") | STATS SUM(LENGTH(text))",
    "pragma": {
        "data_partitioning": "shard"
    }
}' | jq -c '{took, documents_found}'
```

This should spit out something like:
```
%100.0 { "took":403,"documents_found":1000000}
%099.9 {"took":4098, "documents_found":999000}
```

We're loading *fewer* documents but it's slower! What in the world?!
If you dig into the profile you'll see that it's value loading:
```
$ curl -s -uelastic:password -HContent-Type:application/json -XPOST 'localhost:9200/_query?pretty' -d'{
    "query": "FROM test | STATS SUM(LENGTH(text))",
    "pragma": {
        "data_partitioning": "shard"
    },
    "profile": true
}' | jq '.profile.drivers[].operators[] | select(.operator | contains("ValuesSourceReaderOperator"))'
{
  "operator": "ValuesSourceReaderOperator[fields = [text]]",
  "status": {
    "readers_built": {
      "stored_fields[requires_source:true, fields:0, sequential: true]": 222,
      "text:column_at_a_time:null": 222,
      "text:row_stride:BlockSourceReader.Bytes": 1
    },
    "values_loaded": 1000000,
    "process_nanos": 370687157,
    "pages_processed": 222,
    "rows_received": 1000000,
    "rows_emitted": 1000000
  }
}
$ curl -s -uelastic:password -HContent-Type:application/json -XPOST 'localhost:9200/_query?pretty' -d'{
    "query": "FROM test | WHERE NOT text.keyword IN (\"text 0998\") | STATS SUM(LENGTH(text))",
    "pragma": {
        "data_partitioning": "shard"
    },
    "profile": true
}' | jq '.profile.drivers[].operators[] | select(.operator | contains("ValuesSourceReaderOperator"))'
{
  "operator": "ValuesSourceReaderOperator[fields = [text]]",
  "status": {
    "readers_built": {
      "stored_fields[requires_source:true, fields:0, sequential: false]": 222,
      "text:column_at_a_time:null": 222,
      "text:row_stride:BlockSourceReader.Bytes": 1
    },
    "values_loaded": 999000,
    "process_nanos": 3965803793,
    "pages_processed": 222,
    "rows_received": 999000,
    "rows_emitted": 999000
  }
}
```

It jumps from 370ms to almost four seconds! Loading fewer values! The
second big difference is in the `stored_fields` marker. In the second on
it's `sequential: false` and in the first `sequential: true`.

`sequential: true` uses Lucene's "merge" stored fields reader instead of
the default one. It's much more optimized at decoding sequences of
documents.

Previously we only enabled this reader when loading compact sequences of
documents - when the entire block looks like
```
1, 2, 3, 4, 5, ... 1230, 1231
```

If there are any gaps we wouldn't enable it. That was a very
conservative thing we did long ago without doing any experiments. We
knew it was faster without any gaps, but not otherwise. It turns out
it's a lot faster in a lot more cases. I've measured it as faster for
99% gaps, at least on simple documents. I'm a bit worried that this is
too aggressive, so I've set made it configurable and made the default
being to use the "merge" loader with 10% gaps. So we'd use the merge
loader with a block like:
```
1, 11, 21, 31, ..., 1231, 1241
```

ESQL: Fix test locale (#127566)

Was formatting a string and didn't include `Locale.ROOT` so sometimes
the string would use the Arabic ٩ instead of 9. And JSON doesn't parse
those.

Closes #127562
Nik Everett 5 ماه پیش
والد
کامیت
0201ce63f4
18فایلهای تغییر یافته به همراه395 افزوده شده و 31 حذف شده
  1. 3 1
      benchmarks/src/main/java/org/elasticsearch/benchmark/compute/operator/ValuesSourceReaderBenchmark.java
  2. 5 0
      docs/changelog/127348.yaml
  3. 1 1
      docs/reference/esql/functions/functionNamedParams/qstr.asciidoc
  4. 0 0
      docs/reference/esql/functions/kibana/definition/qstr.json
  5. 10 5
      x-pack/plugin/esql/compute/src/main/java/org/elasticsearch/compute/lucene/ValuesSourceReaderOperator.java
  6. 1 1
      x-pack/plugin/esql/compute/src/test/java/org/elasticsearch/compute/OperatorTests.java
  7. 1 1
      x-pack/plugin/esql/compute/src/test/java/org/elasticsearch/compute/lucene/LuceneQueryEvaluatorTests.java
  8. 6 4
      x-pack/plugin/esql/compute/src/test/java/org/elasticsearch/compute/lucene/ValueSourceReaderTypeConversionTests.java
  9. 68 9
      x-pack/plugin/esql/compute/src/test/java/org/elasticsearch/compute/lucene/ValuesSourceReaderOperatorTests.java
  10. 2 2
      x-pack/plugin/esql/qa/server/single-node/src/javaRestTest/java/org/elasticsearch/xpack/esql/qa/single_node/RestEsqlIT.java
  11. 215 0
      x-pack/plugin/esql/qa/server/single-node/src/javaRestTest/java/org/elasticsearch/xpack/esql/qa/single_node/StoredFieldsSequentialIT.java
  12. 2 1
      x-pack/plugin/esql/qa/server/src/main/java/org/elasticsearch/xpack/esql/qa/rest/RestEsqlTestCase.java
  13. 11 0
      x-pack/plugin/esql/qa/testFixtures/src/main/java/org/elasticsearch/xpack/esql/EsqlTestUtils.java
  14. 1 1
      x-pack/plugin/esql/src/internalClusterTest/java/org/elasticsearch/xpack/esql/action/LookupFromIndexIT.java
  15. 8 1
      x-pack/plugin/esql/src/main/java/org/elasticsearch/xpack/esql/enrich/AbstractLookupService.java
  16. 30 2
      x-pack/plugin/esql/src/main/java/org/elasticsearch/xpack/esql/planner/EsPhysicalOperationProviders.java
  17. 6 1
      x-pack/plugin/esql/src/main/java/org/elasticsearch/xpack/esql/plugin/ComputeService.java
  18. 25 1
      x-pack/plugin/esql/src/main/java/org/elasticsearch/xpack/esql/plugin/EsqlPlugin.java

+ 3 - 1
benchmarks/src/main/java/org/elasticsearch/benchmark/compute/operator/ValuesSourceReaderBenchmark.java

@@ -25,6 +25,7 @@ import org.apache.lucene.util.BytesRef;
 import org.apache.lucene.util.NumericUtils;
 import org.elasticsearch.common.breaker.NoopCircuitBreaker;
 import org.elasticsearch.common.lucene.Lucene;
+import org.elasticsearch.common.settings.Settings;
 import org.elasticsearch.common.util.BigArrays;
 import org.elasticsearch.compute.data.BlockFactory;
 import org.elasticsearch.compute.data.BytesRefBlock;
@@ -50,6 +51,7 @@ import org.elasticsearch.index.mapper.KeywordFieldMapper;
 import org.elasticsearch.index.mapper.MappedFieldType;
 import org.elasticsearch.index.mapper.NumberFieldMapper;
 import org.elasticsearch.search.lookup.SearchLookup;
+import org.elasticsearch.xpack.esql.plugin.EsqlPlugin;
 import org.openjdk.jmh.annotations.Benchmark;
 import org.openjdk.jmh.annotations.BenchmarkMode;
 import org.openjdk.jmh.annotations.Fork;
@@ -296,7 +298,7 @@ public class ValuesSourceReaderBenchmark {
             fields(name),
             List.of(new ValuesSourceReaderOperator.ShardContext(reader, () -> {
                 throw new UnsupportedOperationException("can't load _source here");
-            })),
+            }, EsqlPlugin.STORED_FIELDS_SEQUENTIAL_PROPORTION.getDefault(Settings.EMPTY))),
             0
         );
         long sum = 0;

+ 5 - 0
docs/changelog/127348.yaml

@@ -0,0 +1,5 @@
+pr: 127348
+summary: Speed loading stored fields
+area: ES|QL
+type: enhancement
+issues: []

+ 1 - 1
docs/reference/esql/functions/functionNamedParams/qstr.asciidoc

@@ -21,7 +21,7 @@ default_operator | [keyword] | Default boolean logic used to interpret text in t
 analyzer | [keyword] | Analyzer used to convert the text in the query value into token. Defaults to the index-time analyzer mapped for the default_field.
 fuzzy_max_expansions | [integer] | Maximum number of terms to which the query expands for fuzzy matching. Defaults to 50.
 quote_analyzer | [keyword] | Analyzer used to convert quoted text in the query string into tokens. Defaults to the search_quote_analyzer mapped for the default_field.
-allow_wildcard | [boolean] | If true, the query attempts to analyze wildcard terms in the query string. Defaults to false. 
+allow_wildcard | [boolean] | If true, the query attempts to analyze wildcard terms in the query string. Defaults to false.
 boost | [float] | Floating point number used to decrease or increase the relevance scores of the query.
 quote_field_suffix | [keyword] | Suffix appended to quoted text in the query string.
 enable_position_increments | [boolean] | If true, enable position increments in queries constructed from a query_string search. Defaults to true.

تفاوت فایلی نمایش داده نمی شود زیرا این فایل بسیار بزرگ است
+ 0 - 0
docs/reference/esql/functions/kibana/definition/qstr.json


+ 10 - 5
x-pack/plugin/esql/compute/src/main/java/org/elasticsearch/compute/lucene/ValuesSourceReaderOperator.java

@@ -105,7 +105,7 @@ public class ValuesSourceReaderOperator extends AbstractPageMappingOperator {
      */
     public record FieldInfo(String name, ElementType type, IntFunction<BlockLoader> blockLoader) {}
 
-    public record ShardContext(IndexReader reader, Supplier<SourceLoader> newSourceLoader) {}
+    public record ShardContext(IndexReader reader, Supplier<SourceLoader> newSourceLoader, double storedFieldsSequentialProportion) {}
 
     private final FieldWork[] fields;
     private final List<ShardContext> shardContexts;
@@ -241,8 +241,9 @@ public class ValuesSourceReaderOperator extends AbstractPageMappingOperator {
             }
 
             SourceLoader sourceLoader = null;
+            ShardContext shardContext = shardContexts.get(shard);
             if (storedFieldsSpec.requiresSource()) {
-                sourceLoader = shardContexts.get(shard).newSourceLoader.get();
+                sourceLoader = shardContext.newSourceLoader.get();
                 storedFieldsSpec = storedFieldsSpec.merge(new StoredFieldsSpec(true, false, sourceLoader.requiredStoredFields()));
             }
 
@@ -255,7 +256,7 @@ public class ValuesSourceReaderOperator extends AbstractPageMappingOperator {
                 );
             }
             StoredFieldLoader storedFieldLoader;
-            if (useSequentialStoredFieldsReader(docs)) {
+            if (useSequentialStoredFieldsReader(docs, shardContext.storedFieldsSequentialProportion())) {
                 storedFieldLoader = StoredFieldLoader.fromSpecSequential(storedFieldsSpec);
                 trackStoredFields(storedFieldsSpec, true);
             } else {
@@ -432,9 +433,13 @@ public class ValuesSourceReaderOperator extends AbstractPageMappingOperator {
      * Is it more efficient to use a sequential stored field reader
      * when reading stored fields for the documents contained in {@code docIds}?
      */
-    private boolean useSequentialStoredFieldsReader(BlockLoader.Docs docs) {
+    private boolean useSequentialStoredFieldsReader(BlockLoader.Docs docs, double storedFieldsSequentialProportion) {
         int count = docs.count();
-        return count >= SEQUENTIAL_BOUNDARY && docs.get(count - 1) - docs.get(0) == count - 1;
+        if (count < SEQUENTIAL_BOUNDARY) {
+            return false;
+        }
+        int range = docs.get(count - 1) - docs.get(0);
+        return range * storedFieldsSequentialProportion <= count;
     }
 
     private void trackStoredFields(StoredFieldsSpec spec, boolean sequential) {

+ 1 - 1
x-pack/plugin/esql/compute/src/test/java/org/elasticsearch/compute/OperatorTests.java

@@ -193,7 +193,7 @@ public class OperatorTests extends MapperServiceTestCase {
                 operators.add(
                     new OrdinalsGroupingOperator(
                         shardIdx -> new KeywordFieldMapper.KeywordFieldType("g").blockLoader(null),
-                        List.of(new ValuesSourceReaderOperator.ShardContext(reader, () -> SourceLoader.FROM_STORED_SOURCE)),
+                        List.of(new ValuesSourceReaderOperator.ShardContext(reader, () -> SourceLoader.FROM_STORED_SOURCE, 0.2)),
                         ElementType.BYTES_REF,
                         0,
                         gField,

+ 1 - 1
x-pack/plugin/esql/compute/src/test/java/org/elasticsearch/compute/lucene/LuceneQueryEvaluatorTests.java

@@ -207,7 +207,7 @@ public abstract class LuceneQueryEvaluatorTests<T extends Vector, U extends Vect
                     ),
                     List.of(new ValuesSourceReaderOperator.ShardContext(reader, () -> {
                         throw new UnsupportedOperationException();
-                    })),
+                    }, 0.2)),
                     0
                 )
             );

+ 6 - 4
x-pack/plugin/esql/compute/src/test/java/org/elasticsearch/compute/lucene/ValueSourceReaderTypeConversionTests.java

@@ -199,7 +199,7 @@ public class ValueSourceReaderTypeConversionTests extends AnyOperatorTestCase {
     private List<ValuesSourceReaderOperator.ShardContext> initShardContexts() {
         return INDICES.keySet()
             .stream()
-            .map(index -> new ValuesSourceReaderOperator.ShardContext(reader(index), () -> SourceLoader.FROM_STORED_SOURCE))
+            .map(index -> new ValuesSourceReaderOperator.ShardContext(reader(index), () -> SourceLoader.FROM_STORED_SOURCE, 0.2))
             .toList();
     }
 
@@ -1296,7 +1296,7 @@ public class ValueSourceReaderTypeConversionTests extends AnyOperatorTestCase {
             LuceneOperator.NO_LIMIT,
             false // no scoring
         );
-        var vsShardContext = new ValuesSourceReaderOperator.ShardContext(reader(indexKey), () -> SourceLoader.FROM_STORED_SOURCE);
+        var vsShardContext = new ValuesSourceReaderOperator.ShardContext(reader(indexKey), () -> SourceLoader.FROM_STORED_SOURCE, 0.2);
         try (
             Driver driver = new Driver(
                 driverContext,
@@ -1416,7 +1416,7 @@ public class ValueSourceReaderTypeConversionTests extends AnyOperatorTestCase {
 
         ValuesSourceReaderOperator.Factory factory = new ValuesSourceReaderOperator.Factory(
             cases.stream().map(c -> c.info).toList(),
-            List.of(new ValuesSourceReaderOperator.ShardContext(reader(indexKey), () -> SourceLoader.FROM_STORED_SOURCE)),
+            List.of(new ValuesSourceReaderOperator.ShardContext(reader(indexKey), () -> SourceLoader.FROM_STORED_SOURCE, 0.2)),
             0
         );
         assertThat(factory.describe(), equalTo("ValuesSourceReaderOperator[fields = [" + cases.size() + " fields]]"));
@@ -1444,7 +1444,9 @@ public class ValueSourceReaderTypeConversionTests extends AnyOperatorTestCase {
             List<ValuesSourceReaderOperator.ShardContext> readerShardContexts = new ArrayList<>();
             for (int s = 0; s < shardCount; s++) {
                 contexts.add(new LuceneSourceOperatorTests.MockShardContext(readers[s], s));
-                readerShardContexts.add(new ValuesSourceReaderOperator.ShardContext(readers[s], () -> SourceLoader.FROM_STORED_SOURCE));
+                readerShardContexts.add(
+                    new ValuesSourceReaderOperator.ShardContext(readers[s], () -> SourceLoader.FROM_STORED_SOURCE, 0.2)
+                );
             }
             var luceneFactory = new LuceneSourceOperator.Factory(
                 contexts,

+ 68 - 9
x-pack/plugin/esql/compute/src/test/java/org/elasticsearch/compute/lucene/ValuesSourceReaderOperatorTests.java

@@ -113,6 +113,8 @@ public class ValuesSourceReaderOperatorTests extends OperatorTestCase {
         { false, true, true },
         { false, false, true, true } };
 
+    static final double STORED_FIELDS_SEQUENTIAL_PROPORTIONS = 0.2;
+
     private Directory directory = newDirectory();
     private MapperService mapperService;
     private IndexReader reader;
@@ -146,7 +148,16 @@ public class ValuesSourceReaderOperatorTests extends OperatorTestCase {
                 fail("unexpected shardIdx [" + shardIdx + "]");
             }
             return loader;
-        })), List.of(new ValuesSourceReaderOperator.ShardContext(reader, () -> SourceLoader.FROM_STORED_SOURCE)), 0);
+        })),
+            List.of(
+                new ValuesSourceReaderOperator.ShardContext(
+                    reader,
+                    () -> SourceLoader.FROM_STORED_SOURCE,
+                    STORED_FIELDS_SEQUENTIAL_PROPORTIONS
+                )
+            ),
+            0
+        );
     }
 
     @Override
@@ -442,7 +453,13 @@ public class ValuesSourceReaderOperatorTests extends OperatorTestCase {
         operators.add(
             new ValuesSourceReaderOperator.Factory(
                 List.of(testCase.info, fieldInfo(mapperService.fieldType("key"), ElementType.INT)),
-                List.of(new ValuesSourceReaderOperator.ShardContext(reader, () -> SourceLoader.FROM_STORED_SOURCE)),
+                List.of(
+                    new ValuesSourceReaderOperator.ShardContext(
+                        reader,
+                        () -> SourceLoader.FROM_STORED_SOURCE,
+                        STORED_FIELDS_SEQUENTIAL_PROPORTIONS
+                    )
+                ),
                 0
             ).get(driverContext)
         );
@@ -548,7 +565,13 @@ public class ValuesSourceReaderOperatorTests extends OperatorTestCase {
         operators.add(
             new ValuesSourceReaderOperator.Factory(
                 List.of(fieldInfo(mapperService.fieldType("key"), ElementType.INT)),
-                List.of(new ValuesSourceReaderOperator.ShardContext(reader, () -> SourceLoader.FROM_STORED_SOURCE)),
+                List.of(
+                    new ValuesSourceReaderOperator.ShardContext(
+                        reader,
+                        () -> SourceLoader.FROM_STORED_SOURCE,
+                        STORED_FIELDS_SEQUENTIAL_PROPORTIONS
+                    )
+                ),
                 0
             ).get(driverContext)
         );
@@ -560,7 +583,13 @@ public class ValuesSourceReaderOperatorTests extends OperatorTestCase {
             operators.add(
                 new ValuesSourceReaderOperator.Factory(
                     b.stream().map(i -> i.info).toList(),
-                    List.of(new ValuesSourceReaderOperator.ShardContext(reader, () -> SourceLoader.FROM_STORED_SOURCE)),
+                    List.of(
+                        new ValuesSourceReaderOperator.ShardContext(
+                            reader,
+                            () -> SourceLoader.FROM_STORED_SOURCE,
+                            STORED_FIELDS_SEQUENTIAL_PROPORTIONS
+                        )
+                    ),
                     0
                 ).get(driverContext)
             );
@@ -650,7 +679,13 @@ public class ValuesSourceReaderOperatorTests extends OperatorTestCase {
             .map(
                 i -> new ValuesSourceReaderOperator.Factory(
                     List.of(i.info),
-                    List.of(new ValuesSourceReaderOperator.ShardContext(reader, () -> SourceLoader.FROM_STORED_SOURCE)),
+                    List.of(
+                        new ValuesSourceReaderOperator.ShardContext(
+                            reader,
+                            () -> SourceLoader.FROM_STORED_SOURCE,
+                            STORED_FIELDS_SEQUENTIAL_PROPORTIONS
+                        )
+                    ),
                     0
                 ).get(driverContext)
             )
@@ -1417,7 +1452,13 @@ public class ValuesSourceReaderOperatorTests extends OperatorTestCase {
                             new ValuesSourceReaderOperator.FieldInfo("null1", ElementType.NULL, shardIdx -> BlockLoader.CONSTANT_NULLS),
                             new ValuesSourceReaderOperator.FieldInfo("null2", ElementType.NULL, shardIdx -> BlockLoader.CONSTANT_NULLS)
                         ),
-                        List.of(new ValuesSourceReaderOperator.ShardContext(reader, () -> SourceLoader.FROM_STORED_SOURCE)),
+                        List.of(
+                            new ValuesSourceReaderOperator.ShardContext(
+                                reader,
+                                () -> SourceLoader.FROM_STORED_SOURCE,
+                                STORED_FIELDS_SEQUENTIAL_PROPORTIONS
+                            )
+                        ),
                         0
                     ).get(driverContext)
                 ),
@@ -1463,7 +1504,13 @@ public class ValuesSourceReaderOperatorTests extends OperatorTestCase {
                 fieldInfo(mapperService.fieldType("key"), ElementType.INT),
                 fieldInfo(storedTextField("stored_text"), ElementType.BYTES_REF)
             ),
-            List.of(new ValuesSourceReaderOperator.ShardContext(reader, () -> SourceLoader.FROM_STORED_SOURCE)),
+            List.of(
+                new ValuesSourceReaderOperator.ShardContext(
+                    reader,
+                    () -> SourceLoader.FROM_STORED_SOURCE,
+                    STORED_FIELDS_SEQUENTIAL_PROPORTIONS
+                )
+            ),
             0
         ).get(driverContext);
         List<Page> results = drive(op, source.iterator(), driverContext);
@@ -1491,7 +1538,13 @@ public class ValuesSourceReaderOperatorTests extends OperatorTestCase {
 
         ValuesSourceReaderOperator.Factory factory = new ValuesSourceReaderOperator.Factory(
             cases.stream().map(c -> c.info).toList(),
-            List.of(new ValuesSourceReaderOperator.ShardContext(reader, () -> SourceLoader.FROM_STORED_SOURCE)),
+            List.of(
+                new ValuesSourceReaderOperator.ShardContext(
+                    reader,
+                    () -> SourceLoader.FROM_STORED_SOURCE,
+                    STORED_FIELDS_SEQUENTIAL_PROPORTIONS
+                )
+            ),
             0
         );
         assertThat(factory.describe(), equalTo("ValuesSourceReaderOperator[fields = [" + cases.size() + " fields]]"));
@@ -1518,7 +1571,13 @@ public class ValuesSourceReaderOperatorTests extends OperatorTestCase {
             List<ValuesSourceReaderOperator.ShardContext> readerShardContexts = new ArrayList<>();
             for (int s = 0; s < shardCount; s++) {
                 contexts.add(new LuceneSourceOperatorTests.MockShardContext(readers[s], s));
-                readerShardContexts.add(new ValuesSourceReaderOperator.ShardContext(readers[s], () -> SourceLoader.FROM_STORED_SOURCE));
+                readerShardContexts.add(
+                    new ValuesSourceReaderOperator.ShardContext(
+                        readers[s],
+                        () -> SourceLoader.FROM_STORED_SOURCE,
+                        STORED_FIELDS_SEQUENTIAL_PROPORTIONS
+                    )
+                );
             }
             var luceneFactory = new LuceneSourceOperator.Factory(
                 contexts,

+ 2 - 2
x-pack/plugin/esql/qa/server/single-node/src/javaRestTest/java/org/elasticsearch/xpack/esql/qa/single_node/RestEsqlIT.java

@@ -640,7 +640,7 @@ public class RestEsqlIT extends RestEsqlTestCase {
         }
     }
 
-    private MapMatcher commonProfile() {
+    public static MapMatcher commonProfile() {
         return matchesMap().entry("start_millis", greaterThan(0L))
             .entry("stop_millis", greaterThan(0L))
             .entry("iterations", greaterThan(0L))
@@ -655,7 +655,7 @@ public class RestEsqlIT extends RestEsqlTestCase {
      * come back as integers and sometimes longs. This just promotes
      * them to long every time.
      */
-    private void fixTypesOnProfile(Map<String, Object> profile) {
+    public static void fixTypesOnProfile(Map<String, Object> profile) {
         profile.put("iterations", ((Number) profile.get("iterations")).longValue());
         profile.put("cpu_nanos", ((Number) profile.get("cpu_nanos")).longValue());
         profile.put("took_nanos", ((Number) profile.get("took_nanos")).longValue());

+ 215 - 0
x-pack/plugin/esql/qa/server/single-node/src/javaRestTest/java/org/elasticsearch/xpack/esql/qa/single_node/StoredFieldsSequentialIT.java

@@ -0,0 +1,215 @@
+/*
+ * Copyright Elasticsearch B.V. and/or licensed to Elasticsearch B.V. under one
+ * or more contributor license agreements. Licensed under the Elastic License
+ * 2.0; you may not use this file except in compliance with the Elastic License
+ * 2.0.
+ */
+
+package org.elasticsearch.xpack.esql.qa.single_node;
+
+import com.carrotsearch.randomizedtesting.annotations.ThreadLeakFilters;
+
+import org.elasticsearch.client.Request;
+import org.elasticsearch.client.Response;
+import org.elasticsearch.client.ResponseException;
+import org.elasticsearch.logging.LogManager;
+import org.elasticsearch.logging.Logger;
+import org.elasticsearch.test.MapMatcher;
+import org.elasticsearch.test.TestClustersThreadFilter;
+import org.elasticsearch.test.cluster.ElasticsearchCluster;
+import org.elasticsearch.test.rest.ESRestTestCase;
+import org.elasticsearch.xcontent.XContentType;
+import org.elasticsearch.xpack.esql.AssertWarnings;
+import org.elasticsearch.xpack.esql.qa.rest.RestEsqlTestCase;
+import org.junit.Before;
+import org.junit.ClassRule;
+
+import java.io.IOException;
+import java.util.List;
+import java.util.Locale;
+import java.util.Map;
+import java.util.stream.Collectors;
+import java.util.stream.IntStream;
+
+import static org.elasticsearch.test.MapMatcher.assertMap;
+import static org.elasticsearch.test.MapMatcher.matchesMap;
+import static org.elasticsearch.xpack.esql.EsqlTestUtils.entityToMap;
+import static org.elasticsearch.xpack.esql.qa.rest.RestEsqlTestCase.requestObjectBuilder;
+import static org.elasticsearch.xpack.esql.qa.rest.RestEsqlTestCase.runEsql;
+import static org.elasticsearch.xpack.esql.qa.single_node.RestEsqlIT.commonProfile;
+import static org.elasticsearch.xpack.esql.qa.single_node.RestEsqlIT.fixTypesOnProfile;
+import static org.hamcrest.Matchers.*;
+
+/**
+ * Tests for {@code index.esql.stored_fields_sequential_proportion} which controls
+ * an optimization we use when loading from {@code _source}.
+ */
+@ThreadLeakFilters(filters = TestClustersThreadFilter.class)
+public class StoredFieldsSequentialIT extends ESRestTestCase {
+    private static final Logger LOG = LogManager.getLogger(StoredFieldsSequentialIT.class);
+
+    @ClassRule
+    public static ElasticsearchCluster cluster = Clusters.testCluster();
+
+    public void testFetchTen() throws IOException {
+        testQuery(null, """
+            FROM test
+            | LIMIT 10
+            """, 10, true);
+    }
+
+    public void testAggAll() throws IOException {
+        testQuery(null, """
+            FROM test
+            | STATS SUM(LENGTH(test))
+            """, 1000, true);
+    }
+
+    public void testAggTwentyPercent() throws IOException {
+        testQuery(null, """
+            FROM test
+            | WHERE STARTS_WITH(test.keyword, "test1") OR STARTS_WITH(test.keyword, "test2")
+            | STATS SUM(LENGTH(test))
+            """, 200, true);
+    }
+
+    public void testAggTenPercentDefault() throws IOException {
+        testAggTenPercent(null, false);
+    }
+
+    public void testAggTenPercentConfiguredToTenPct() throws IOException {
+        testAggTenPercent(0.10, true);
+    }
+
+    public void testAggTenPercentConfiguredToOnePct() throws IOException {
+        testAggTenPercent(0.01, true);
+    }
+
+    /**
+     * It's important for the test that the queries we use detect "scattered" docs.
+     * If they were "compact" in the index we'd still load them using the sequential
+     * reader.
+     */
+    private void testAggTenPercent(Double percent, boolean sequential) throws IOException {
+        String filter = IntStream.range(0, 10)
+            .mapToObj(i -> String.format(Locale.ROOT, "STARTS_WITH(test.keyword, \"test%s%s\")", i, i))
+            .collect(Collectors.joining(" OR "));
+        testQuery(percent, String.format(Locale.ROOT, """
+            FROM test
+            | WHERE %s
+            | STATS SUM(LENGTH(test))
+            """, filter), 100, sequential);
+    }
+
+    private void testQuery(Double percent, String query, int documentsFound, boolean sequential) throws IOException {
+        setPercent(percent);
+        RestEsqlTestCase.RequestObjectBuilder builder = requestObjectBuilder().query(query);
+        builder.profile(true);
+        Map<String, Object> result = runEsql(builder, new AssertWarnings.NoWarnings(), RestEsqlTestCase.Mode.SYNC);
+        assertMap(
+            result,
+            matchesMap()
+                // .entry("documents_found", documentsFound) Backport incoming maybe
+                .entry("profile", matchesMap().entry("drivers", instanceOf(List.class)))
+                .extraOk()
+        );
+
+        @SuppressWarnings("unchecked")
+        List<Map<String, Object>> profiles = (List<Map<String, Object>>) ((Map<String, Object>) result.get("profile")).get("drivers");
+        for (Map<String, Object> p : profiles) {
+            fixTypesOnProfile(p);
+            assertThat(p, commonProfile());
+            @SuppressWarnings("unchecked")
+            List<Map<String, Object>> operators = (List<Map<String, Object>>) p.get("operators");
+            for (Map<String, Object> o : operators) {
+                LOG.info("profile {}", o.get("operator"));
+            }
+            for (Map<String, Object> o : operators) {
+                checkOperatorProfile(o, sequential);
+            }
+        }
+    }
+
+    private void setPercent(Double percent) throws IOException {
+        Request set = new Request("PUT", "test/_settings");
+        set.setJsonEntity(String.format(Locale.ROOT, """
+            {
+                "index": {
+                    "esql": {
+                        "stored_fields_sequential_proportion": %s
+                    }
+                }
+            }
+            """, percent));
+        assertMap(entityToMap(client().performRequest(set).getEntity(), XContentType.JSON), matchesMap().entry("acknowledged", true));
+    }
+
+    @Before
+    public void buildIndex() throws IOException {
+        Request exists = new Request("GET", "test");
+        try {
+            client().performRequest(exists);
+            return;
+        } catch (ResponseException e) {
+            if (e.getResponse().getStatusLine().getStatusCode() != 404) {
+                throw e;
+            }
+        }
+        Request createIndex = new Request("PUT", "test");
+        createIndex.setJsonEntity("""
+            {
+              "settings": {
+                "index": {
+                  "number_of_shards": 1,
+                  "sort.field": "i"
+                }
+              },
+              "mappings": {
+                "properties": {
+                  "i": {"type": "long"}
+                }
+              }
+            }""");
+        Response createResponse = client().performRequest(createIndex);
+        assertThat(
+            entityToMap(createResponse.getEntity(), XContentType.JSON),
+            matchesMap().entry("shards_acknowledged", true).entry("index", "test").entry("acknowledged", true)
+        );
+
+        Request bulk = new Request("POST", "/_bulk");
+        bulk.addParameter("refresh", "");
+        StringBuilder b = new StringBuilder();
+        for (int i = 0; i < 1000; i++) {
+            b.append(String.format(Locale.ROOT, """
+                {"create":{"_index":"test"}}
+                {"test":"test%03d", "i": %d}
+                """, i, i));
+        }
+        bulk.setJsonEntity(b.toString());
+        Response bulkResponse = client().performRequest(bulk);
+        assertThat(entityToMap(bulkResponse.getEntity(), XContentType.JSON), matchesMap().entry("errors", false).extraOk());
+    }
+
+    @Override
+    protected boolean preserveIndicesUponCompletion() {
+        return true;
+    }
+
+    private static void checkOperatorProfile(Map<String, Object> o, boolean sequential) {
+        String name = (String) o.get("operator");
+        if (name.startsWith("ValuesSourceReaderOperator")) {
+            MapMatcher readersBuilt = matchesMap().entry(
+                "stored_fields[requires_source:true, fields:0, sequential: " + sequential + "]",
+                greaterThanOrEqualTo(1)
+            ).extraOk();
+            MapMatcher expectedOp = matchesMap().entry("operator", name)
+                .entry("status", matchesMap().entry("readers_built", readersBuilt).extraOk());
+            assertMap(o, expectedOp);
+        }
+    }
+
+    @Override
+    protected String getTestRestCluster() {
+        return cluster.getHttpAddresses();
+    }
+}

+ 2 - 1
x-pack/plugin/esql/qa/server/src/main/java/org/elasticsearch/xpack/esql/qa/rest/RestEsqlTestCase.java

@@ -1233,7 +1233,8 @@ public abstract class RestEsqlTestCase extends ESRestTestCase {
         return runEsqlAsync(requestObject, randomBoolean(), new AssertWarnings.NoWarnings());
     }
 
-    static Map<String, Object> runEsql(RequestObjectBuilder requestObject, AssertWarnings assertWarnings, Mode mode) throws IOException {
+    public static Map<String, Object> runEsql(RequestObjectBuilder requestObject, AssertWarnings assertWarnings, Mode mode)
+        throws IOException {
         if (mode == ASYNC) {
             return runEsqlAsync(requestObject, randomBoolean(), assertWarnings);
         } else {

+ 11 - 0
x-pack/plugin/esql/qa/testFixtures/src/main/java/org/elasticsearch/xpack/esql/EsqlTestUtils.java

@@ -7,6 +7,7 @@
 
 package org.elasticsearch.xpack.esql;
 
+import org.apache.http.HttpEntity;
 import org.apache.lucene.document.InetAddressPoint;
 import org.apache.lucene.sandbox.document.HalfFloatPoint;
 import org.apache.lucene.util.BytesRef;
@@ -21,6 +22,7 @@ import org.elasticsearch.common.bytes.BytesReference;
 import org.elasticsearch.common.regex.Regex;
 import org.elasticsearch.common.settings.Settings;
 import org.elasticsearch.common.util.BigArrays;
+import org.elasticsearch.common.xcontent.XContentHelper;
 import org.elasticsearch.compute.data.AggregateMetricDoubleBlockBuilder;
 import org.elasticsearch.compute.data.BlockFactory;
 import org.elasticsearch.compute.data.BlockUtils;
@@ -40,6 +42,7 @@ import org.elasticsearch.tasks.TaskCancelledException;
 import org.elasticsearch.test.ESTestCase;
 import org.elasticsearch.transport.RemoteTransportException;
 import org.elasticsearch.transport.TransportService;
+import org.elasticsearch.xcontent.XContentType;
 import org.elasticsearch.xcontent.json.JsonXContent;
 import org.elasticsearch.xpack.esql.action.EsqlQueryResponse;
 import org.elasticsearch.xpack.esql.analysis.EnrichResolution;
@@ -852,6 +855,14 @@ public final class EsqlTestUtils {
             .ifPresent(transportFailure -> assertNull("remote transport exception must be unwrapped", transportFailure.getCause()));
     }
 
+    public static Map<String, Object> entityToMap(HttpEntity entity, XContentType expectedContentType) throws IOException {
+        try (InputStream content = entity.getContent()) {
+            XContentType xContentType = XContentType.fromMediaType(entity.getContentType().getValue());
+            assertEquals(expectedContentType, xContentType);
+            return XContentHelper.convertToMap(xContentType.xContent(), content, false /* ordered */);
+        }
+    }
+
     /**
      * Errors from remotes are wrapped in RemoteException while the ones from the local cluster
      * aren't. This utility method is useful for unwrapping in such cases.

+ 1 - 1
x-pack/plugin/esql/src/internalClusterTest/java/org/elasticsearch/xpack/esql/action/LookupFromIndexIT.java

@@ -197,7 +197,7 @@ public class LookupFromIndexIT extends AbstractEsqlIntegTestCase {
                 ),
                 List.of(new ValuesSourceReaderOperator.ShardContext(searchContext.getSearchExecutionContext().getIndexReader(), () -> {
                     throw new IllegalStateException("can't load source here");
-                })),
+                }, EsqlPlugin.STORED_FIELDS_SEQUENTIAL_PROPORTION.getDefault(Settings.EMPTY))),
                 0
             );
             CancellableTask parentTask = new EsqlQueryTask(

+ 8 - 1
x-pack/plugin/esql/src/main/java/org/elasticsearch/xpack/esql/enrich/AbstractLookupService.java

@@ -21,6 +21,7 @@ import org.elasticsearch.cluster.service.ClusterService;
 import org.elasticsearch.common.CheckedBiFunction;
 import org.elasticsearch.common.collect.Iterators;
 import org.elasticsearch.common.io.stream.StreamInput;
+import org.elasticsearch.common.settings.Settings;
 import org.elasticsearch.common.util.BigArrays;
 import org.elasticsearch.common.util.concurrent.ThreadContext;
 import org.elasticsearch.compute.data.Block;
@@ -401,7 +402,13 @@ public abstract class AbstractLookupService<R extends AbstractLookupService.Requ
         return new ValuesSourceReaderOperator(
             driverContext.blockFactory(),
             fields,
-            List.of(new ValuesSourceReaderOperator.ShardContext(shardContext.searcher().getIndexReader(), shardContext::newSourceLoader)),
+            List.of(
+                new ValuesSourceReaderOperator.ShardContext(
+                    shardContext.searcher().getIndexReader(),
+                    shardContext::newSourceLoader,
+                    EsqlPlugin.STORED_FIELDS_SEQUENTIAL_PROPORTION.getDefault(Settings.EMPTY)
+                )
+            ),
             0
         );
     }

+ 30 - 2
x-pack/plugin/esql/src/main/java/org/elasticsearch/xpack/esql/planner/EsPhysicalOperationProviders.java

@@ -59,6 +59,7 @@ import org.elasticsearch.xpack.esql.plan.physical.FieldExtractExec;
 import org.elasticsearch.xpack.esql.planner.LocalExecutionPlanner.DriverParallelism;
 import org.elasticsearch.xpack.esql.planner.LocalExecutionPlanner.LocalExecutionPlannerContext;
 import org.elasticsearch.xpack.esql.planner.LocalExecutionPlanner.PhysicalOperation;
+import org.elasticsearch.xpack.esql.plugin.EsqlPlugin;
 
 import java.io.IOException;
 import java.util.ArrayList;
@@ -93,6 +94,16 @@ public class EsPhysicalOperationProviders extends AbstractPhysicalOperationProvi
          * Returns something to load values from this field into a {@link Block}.
          */
         BlockLoader blockLoader(String name, boolean asUnsupportedSource, MappedFieldType.FieldExtractPreference fieldExtractPreference);
+
+        /**
+         * Tuning parameter for deciding when to use the "merge" stored field loader.
+         * Think of it as "how similar to a sequential block of documents do I have to
+         * be before I'll use the merge reader?" So a value of {@code 1} means I have to
+         * be <strong>exactly</strong> a sequential block, like {@code 0, 1, 2, 3, .. 1299, 1300}.
+         * A value of {@code .2} means we'll use the sequential reader even if we only
+         * need one in ten documents.
+         */
+        double storedFieldsSequentialProportion();
     }
 
     private final List<ShardContext> shardContexts;
@@ -107,7 +118,13 @@ public class EsPhysicalOperationProviders extends AbstractPhysicalOperationProvi
         Layout.Builder layout = source.layout.builder();
         var sourceAttr = fieldExtractExec.sourceAttribute();
         List<ValuesSourceReaderOperator.ShardContext> readers = shardContexts.stream()
-            .map(s -> new ValuesSourceReaderOperator.ShardContext(s.searcher().getIndexReader(), s::newSourceLoader))
+            .map(
+                s -> new ValuesSourceReaderOperator.ShardContext(
+                    s.searcher().getIndexReader(),
+                    s::newSourceLoader,
+                    s.storedFieldsSequentialProportion()
+                )
+            )
             .toList();
         List<ValuesSourceReaderOperator.FieldInfo> fields = new ArrayList<>();
         int docChannel = source.layout.get(sourceAttr.id()).channel();
@@ -235,7 +252,13 @@ public class EsPhysicalOperationProviders extends AbstractPhysicalOperationProvi
         var sourceAttribute = FieldExtractExec.extractSourceAttributesFrom(aggregateExec.child());
         int docChannel = source.layout.get(sourceAttribute.id()).channel();
         List<ValuesSourceReaderOperator.ShardContext> vsShardContexts = shardContexts.stream()
-            .map(s -> new ValuesSourceReaderOperator.ShardContext(s.searcher().getIndexReader(), s::newSourceLoader))
+            .map(
+                s -> new ValuesSourceReaderOperator.ShardContext(
+                    s.searcher().getIndexReader(),
+                    s::newSourceLoader,
+                    s.storedFieldsSequentialProportion()
+                )
+            )
             .toList();
         // The grouping-by values are ready, let's group on them directly.
         // Costin: why are they ready and not already exposed in the layout?
@@ -369,6 +392,11 @@ public class EsPhysicalOperationProviders extends AbstractPhysicalOperationProvi
 
             return loader;
         }
+
+        @Override
+        public double storedFieldsSequentialProportion() {
+            return EsqlPlugin.STORED_FIELDS_SEQUENTIAL_PROPORTION.get(ctx.getIndexSettings().getSettings());
+        }
     }
 
     private static class TypeConvertingBlockLoader implements BlockLoader {

+ 6 - 1
x-pack/plugin/esql/src/main/java/org/elasticsearch/xpack/esql/plugin/ComputeService.java

@@ -379,6 +379,11 @@ public class ComputeService {
                 new EsPhysicalOperationProviders.DefaultShardContext(i, searchExecutionContext, searchContext.request().getAliasFilter())
             );
         }
+        EsPhysicalOperationProviders physicalOperationProviders = new EsPhysicalOperationProviders(
+            context.foldCtx(),
+            contexts,
+            searchService.getIndicesService().getAnalysis()
+        );
         final List<Driver> drivers;
         try {
             LocalExecutionPlanner planner = new LocalExecutionPlanner(
@@ -393,7 +398,7 @@ public class ComputeService {
                 context.exchangeSinkSupplier(),
                 enrichLookupService,
                 lookupFromIndexService,
-                new EsPhysicalOperationProviders(context.foldCtx(), contexts, searchService.getIndicesService().getAnalysis()),
+                physicalOperationProviders,
                 contexts
             );
 

+ 25 - 1
x-pack/plugin/esql/src/main/java/org/elasticsearch/xpack/esql/plugin/EsqlPlugin.java

@@ -153,6 +153,29 @@ public class EsqlPlugin extends Plugin implements ActionPlugin {
         Setting.Property.Dynamic
     );
 
+    /**
+     * Tuning parameter for deciding when to use the "merge" stored field loader.
+     * Think of it as "how similar to a sequential block of documents do I have to
+     * be before I'll use the merge reader?" So a value of {@code 1} means I have to
+     * be <strong>exactly</strong> a sequential block, like {@code 0, 1, 2, 3, .. 1299, 1300}.
+     * A value of {@code .2} means we'll use the sequential reader even if we only
+     * need one in ten documents.
+     * <p>
+     *     The default value of this was experimentally derived using a
+     *     <a href="https://gist.github.com/nik9000/ac6857de10745aad210b6397915ff846">script</a>.
+     *     And a little paranoia. A lower default value was looking good locally, but
+     *     I'm concerned about the implications of effectively using this all the time.
+     * </p>
+     */
+    public static final Setting<Double> STORED_FIELDS_SEQUENTIAL_PROPORTION = Setting.doubleSetting(
+        "index.esql.stored_fields_sequential_proportion",
+        0.20,
+        0,
+        1,
+        Setting.Property.IndexScope,
+        Setting.Property.Dynamic
+    );
+
     @Override
     public Collection<?> createComponents(PluginServices services) {
         CircuitBreaker circuitBreaker = services.indicesService().getBigArrays().breakerService().getBreaker("request");
@@ -215,7 +238,8 @@ public class EsqlPlugin extends Plugin implements ActionPlugin {
             ESQL_QUERYLOG_THRESHOLD_DEBUG_SETTING,
             ESQL_QUERYLOG_THRESHOLD_INFO_SETTING,
             ESQL_QUERYLOG_THRESHOLD_WARN_SETTING,
-            ESQL_QUERYLOG_INCLUDE_USER_SETTING
+            ESQL_QUERYLOG_INCLUDE_USER_SETTING,
+            STORED_FIELDS_SEQUENTIAL_PROPORTION
         );
     }
 

برخی فایل ها در این مقایسه diff نمایش داده نمی شوند زیرا تعداد فایل ها بسیار زیاد است