Browse Source

Speed up loading keyword fields with index sorts (#132950)

Reading keyword fields that are the primary sort in the index can be sped up by skip reading, as identical values are stored together. In this case, we can use values from the doc_values skipper instead of loading values from doc_values. However, the doc_values skipper is not enabled yet. Here, we use two buffers when reading ordinals: one for the beginning of the block and one for the end. If both return the same value, we can skip the middle. There is a follow-up step where we fill the values in the middle until we reach the last value. This optimization should speed up time-series queries.
Nhat Nguyen 2 months ago
parent
commit
64f8209da4

+ 5 - 0
docs/changelog/132950.yaml

@@ -0,0 +1,5 @@
+pr: 132950
+summary: Speed up loading keyword fields with index sorts
+area: "ES|QL"
+type: enhancement
+issues: []

+ 0 - 27
server/src/main/java/org/elasticsearch/index/codec/tsdb/es819/BulkNumericDocValues.java

@@ -1,27 +0,0 @@
-/*
- * Copyright Elasticsearch B.V. and/or licensed to Elasticsearch B.V. under one
- * or more contributor license agreements. Licensed under the "Elastic License
- * 2.0", the "GNU Affero General Public License v3.0 only", and the "Server Side
- * Public License v 1"; you may not use this file except in compliance with, at
- * your election, the "Elastic License 2.0", the "GNU Affero General Public
- * License v3.0 only", or the "Server Side Public License, v 1".
- */
-
-package org.elasticsearch.index.codec.tsdb.es819;
-
-import org.apache.lucene.index.NumericDocValues;
-import org.elasticsearch.index.mapper.BlockLoader;
-
-import java.io.IOException;
-
-/**
- * An es819 doc values specialization that allows bulk loading of values that is optimized in the context of compute engine.
- */
-public abstract class BulkNumericDocValues extends NumericDocValues {
-
-    /**
-     * Reads the values of all documents in {@code docs}.
-     */
-    public abstract BlockLoader.Block read(BlockLoader.BlockFactory factory, BlockLoader.Docs docs, int offset) throws IOException;
-
-}

+ 101 - 20
server/src/main/java/org/elasticsearch/index/codec/tsdb/es819/ES819TSDBDocValuesProducer.java

@@ -53,6 +53,7 @@ import static org.elasticsearch.index.codec.tsdb.es819.ES819TSDBDocValuesFormat.
 
 final class ES819TSDBDocValuesProducer extends DocValuesProducer {
     final IntObjectHashMap<NumericEntry> numerics;
+    private int primarySortFieldNumber = -1;
     final IntObjectHashMap<BinaryEntry> binaries;
     final IntObjectHashMap<SortedEntry> sorted;
     final IntObjectHashMap<SortedSetEntry> sortedSets;
@@ -91,7 +92,14 @@ final class ES819TSDBDocValuesProducer extends DocValuesProducer {
                 );
 
                 readFields(in, state.fieldInfos);
-
+                final var indexSort = state.segmentInfo.getIndexSort();
+                if (indexSort != null && indexSort.getSort().length > 0) {
+                    var primarySortField = indexSort.getSort()[0];
+                    var sortField = state.fieldInfos.fieldInfo(primarySortField.getField());
+                    if (sortField != null) {
+                        primarySortFieldNumber = sortField.number;
+                    }
+                }
             } catch (Throwable exception) {
                 priorE = exception;
             } finally {
@@ -333,10 +341,10 @@ final class ES819TSDBDocValuesProducer extends DocValuesProducer {
     @Override
     public SortedDocValues getSorted(FieldInfo field) throws IOException {
         SortedEntry entry = sorted.get(field.number);
-        return getSorted(entry);
+        return getSorted(entry, field.number == primarySortFieldNumber);
     }
 
-    private SortedDocValues getSorted(SortedEntry entry) throws IOException {
+    private SortedDocValues getSorted(SortedEntry entry, boolean valuesSorted) throws IOException {
         final NumericDocValues ords = getNumeric(entry.ordsEntry, entry.termsDictEntry.termsDictSize);
         return new BaseSortedDocValues(entry) {
 
@@ -369,10 +377,29 @@ final class ES819TSDBDocValuesProducer extends DocValuesProducer {
             public long cost() {
                 return ords.cost();
             }
+
+            @Override
+            public BlockLoader.Block tryRead(BlockLoader.BlockFactory factory, BlockLoader.Docs docs, int offset) throws IOException {
+                if (valuesSorted && ords instanceof BaseDenseNumericValues denseOrds) {
+                    int firstDoc = docs.get(offset);
+                    denseOrds.advanceExact(firstDoc);
+                    long startValue = denseOrds.longValue();
+                    final int docCount = docs.count();
+                    int lastDoc = docs.get(docCount - 1);
+                    long lastValue = denseOrds.lookAheadValueAt(lastDoc);
+                    if (lastValue == startValue) {
+                        BytesRef b = lookupOrd(Math.toIntExact(startValue));
+                        return factory.constantBytes(BytesRef.deepCopyOf(b), docCount - offset);
+                    }
+                    // TODO: Since ordinals are sorted, start at 0 (offset by startValue), scan until lastValue,
+                    // then fill remaining positions with lastValue.
+                }
+                return null;
+            }
         };
     }
 
-    abstract class BaseSortedDocValues extends SortedDocValues {
+    abstract class BaseSortedDocValues extends SortedDocValues implements BlockLoader.OptionalColumnAtATimeReader {
 
         final SortedEntry entry;
         final TermsEnum termsEnum;
@@ -406,6 +433,15 @@ final class ES819TSDBDocValuesProducer extends DocValuesProducer {
         public TermsEnum termsEnum() throws IOException {
             return new TermsDict(entry.termsDictEntry, data, merging);
         }
+
+        @Override
+        public BlockLoader.Block tryRead(BlockLoader.BlockFactory factory, BlockLoader.Docs docs, int offset) throws IOException {
+            return null;
+        }
+    }
+
+    abstract static class BaseDenseNumericValues extends NumericDocValues implements BlockLoader.OptionalColumnAtATimeReader {
+        abstract long lookAheadValueAt(int targetDoc) throws IOException;
     }
 
     abstract static class BaseSortedSetDocValues extends SortedSetDocValues {
@@ -695,7 +731,7 @@ final class ES819TSDBDocValuesProducer extends DocValuesProducer {
     public SortedSetDocValues getSortedSet(FieldInfo field) throws IOException {
         SortedSetEntry entry = sortedSets.get(field.number);
         if (entry.singleValueEntry != null) {
-            return DocValues.singleton(getSorted(entry.singleValueEntry));
+            return DocValues.singleton(getSorted(entry.singleValueEntry, field.number == primarySortFieldNumber));
         }
 
         SortedNumericEntry ordsEntry = entry.ordsEntry;
@@ -1047,7 +1083,7 @@ final class ES819TSDBDocValuesProducer extends DocValuesProducer {
             // Special case for maxOrd 1, no need to read blocks and use ordinal 0 as only value
             if (entry.docsWithFieldOffset == -1) {
                 // Special case when all docs have a value
-                return new NumericDocValues() {
+                return new BaseDenseNumericValues() {
 
                     private final int maxDoc = ES819TSDBDocValuesProducer.this.maxDoc;
                     private int doc = -1;
@@ -1086,6 +1122,17 @@ final class ES819TSDBDocValuesProducer extends DocValuesProducer {
                     public long cost() {
                         return maxDoc;
                     }
+
+                    @Override
+                    long lookAheadValueAt(int targetDoc) throws IOException {
+                        return 0L;  // Only one ordinal!
+                    }
+
+                    @Override
+                    public BlockLoader.Block tryRead(BlockLoader.BlockFactory factory, BlockLoader.Docs docs, int offset)
+                        throws IOException {
+                        return null;
+                    }
                 };
             } else {
                 final IndexedDISI disi = new IndexedDISI(
@@ -1141,13 +1188,17 @@ final class ES819TSDBDocValuesProducer extends DocValuesProducer {
         final int bitsPerOrd = maxOrd >= 0 ? PackedInts.bitsRequired(maxOrd - 1) : -1;
         if (entry.docsWithFieldOffset == -1) {
             // dense
-            return new BulkNumericDocValues() {
+            return new BaseDenseNumericValues() {
 
                 private final int maxDoc = ES819TSDBDocValuesProducer.this.maxDoc;
                 private int doc = -1;
                 private final TSDBDocValuesEncoder decoder = new TSDBDocValuesEncoder(ES819TSDBDocValuesFormat.NUMERIC_BLOCK_SIZE);
                 private long currentBlockIndex = -1;
                 private final long[] currentBlock = new long[ES819TSDBDocValuesFormat.NUMERIC_BLOCK_SIZE];
+                // lookahead block
+                private long lookaheadBlockIndex = -1;
+                private long[] lookaheadBlock;
+                private IndexInput lookaheadData = null;
 
                 @Override
                 public int docID() {
@@ -1183,24 +1234,28 @@ final class ES819TSDBDocValuesProducer extends DocValuesProducer {
                     final int index = doc;
                     final int blockIndex = index >>> ES819TSDBDocValuesFormat.NUMERIC_BLOCK_SHIFT;
                     final int blockInIndex = index & ES819TSDBDocValuesFormat.NUMERIC_BLOCK_MASK;
-                    if (blockIndex != currentBlockIndex) {
-                        assert blockIndex > currentBlockIndex : blockIndex + " < " + currentBlockIndex;
-                        // no need to seek if the loading block is the next block
-                        if (currentBlockIndex + 1 != blockIndex) {
-                            valuesData.seek(indexReader.get(blockIndex));
-                        }
-                        currentBlockIndex = blockIndex;
-                        if (maxOrd >= 0) {
-                            decoder.decodeOrdinals(valuesData, currentBlock, bitsPerOrd);
-                        } else {
-                            decoder.decode(valuesData, currentBlock);
-                        }
+                    if (blockIndex == currentBlockIndex) {
+                        return currentBlock[blockInIndex];
+                    }
+                    if (blockIndex == lookaheadBlockIndex) {
+                        return lookaheadBlock[blockInIndex];
+                    }
+                    assert blockIndex > currentBlockIndex : blockIndex + " < " + currentBlockIndex;
+                    // no need to seek if the loading block is the next block
+                    if (currentBlockIndex + 1 != blockIndex) {
+                        valuesData.seek(indexReader.get(blockIndex));
+                    }
+                    currentBlockIndex = blockIndex;
+                    if (maxOrd >= 0) {
+                        decoder.decodeOrdinals(valuesData, currentBlock, bitsPerOrd);
+                    } else {
+                        decoder.decode(valuesData, currentBlock);
                     }
                     return currentBlock[blockInIndex];
                 }
 
                 @Override
-                public BlockLoader.Block read(BlockLoader.BlockFactory factory, BlockLoader.Docs docs, int offset) throws IOException {
+                public BlockLoader.Block tryRead(BlockLoader.BlockFactory factory, BlockLoader.Docs docs, int offset) throws IOException {
                     assert maxOrd == -1 : "unexpected maxOrd[" + maxOrd + "]";
                     final int docsCount = docs.count();
                     doc = docs.get(docsCount - 1);
@@ -1238,6 +1293,32 @@ final class ES819TSDBDocValuesProducer extends DocValuesProducer {
                     }
                 }
 
+                @Override
+                long lookAheadValueAt(int targetDoc) throws IOException {
+                    final int blockIndex = targetDoc >>> ES819TSDBDocValuesFormat.NUMERIC_BLOCK_SHIFT;
+                    final int valueIndex = targetDoc & ES819TSDBDocValuesFormat.NUMERIC_BLOCK_MASK;
+                    if (blockIndex == currentBlockIndex) {
+                        return currentBlock[valueIndex];
+                    }
+                    // load data to the lookahead block
+                    if (lookaheadBlockIndex != blockIndex) {
+                        if (lookaheadBlock == null) {
+                            lookaheadBlock = new long[ES819TSDBDocValuesFormat.NUMERIC_BLOCK_SIZE];
+                            lookaheadData = data.slice("look_ahead_values", entry.valuesOffset, entry.valuesLength);
+                        }
+                        if (lookaheadBlockIndex + 1 != blockIndex) {
+                            lookaheadData.seek(indexReader.get(blockIndex));
+                        }
+                        if (maxOrd == -1L) {
+                            decoder.decode(lookaheadData, lookaheadBlock);
+                        } else {
+                            decoder.decodeOrdinals(lookaheadData, lookaheadBlock, bitsPerOrd);
+                        }
+                        lookaheadBlockIndex = blockIndex;
+                    }
+                    return lookaheadBlock[valueIndex];
+                }
+
                 static boolean isDense(int firstDocId, int lastDocId, int length) {
                     // This does not detect duplicate docids (e.g [1, 1, 2, 4] would be detected as dense),
                     // this can happen with enrich or lookup. However this codec isn't used for enrich / lookup.

+ 11 - 3
server/src/main/java/org/elasticsearch/index/mapper/BlockDocValuesReader.java

@@ -22,7 +22,6 @@ import org.apache.lucene.index.SortedSetDocValues;
 import org.apache.lucene.util.BytesRef;
 import org.elasticsearch.common.io.stream.ByteArrayStreamInput;
 import org.elasticsearch.index.IndexVersion;
-import org.elasticsearch.index.codec.tsdb.es819.BulkNumericDocValues;
 import org.elasticsearch.index.mapper.BlockLoader.BlockFactory;
 import org.elasticsearch.index.mapper.BlockLoader.BooleanBuilder;
 import org.elasticsearch.index.mapper.BlockLoader.Builder;
@@ -133,8 +132,11 @@ public abstract class BlockDocValuesReader implements BlockLoader.AllReader {
 
         @Override
         public BlockLoader.Block read(BlockFactory factory, Docs docs, int offset) throws IOException {
-            if (numericDocValues instanceof BulkNumericDocValues bulkDv) {
-                return bulkDv.read(factory, docs, offset);
+            if (numericDocValues instanceof BlockLoader.OptionalColumnAtATimeReader direct) {
+                BlockLoader.Block result = direct.tryRead(factory, docs, offset);
+                if (result != null) {
+                    return result;
+                }
             }
             try (BlockLoader.LongBuilder builder = factory.longsFromDocValues(docs.count() - offset)) {
                 int lastDoc = -1;
@@ -748,6 +750,12 @@ public abstract class BlockDocValuesReader implements BlockLoader.AllReader {
             if (docs.count() - offset == 1) {
                 return readSingleDoc(factory, docs.get(offset));
             }
+            if (ordinals instanceof BlockLoader.OptionalColumnAtATimeReader direct) {
+                BlockLoader.Block block = direct.tryRead(factory, docs, offset);
+                if (block != null) {
+                    return block;
+                }
+            }
             try (var builder = factory.singletonOrdinalsBuilder(ordinals, docs.count() - offset)) {
                 for (int i = offset; i < docs.count(); i++) {
                     int doc = docs.get(i);

+ 17 - 1
server/src/main/java/org/elasticsearch/index/mapper/BlockLoader.java

@@ -13,6 +13,7 @@ import org.apache.lucene.index.LeafReaderContext;
 import org.apache.lucene.index.SortedDocValues;
 import org.apache.lucene.index.SortedSetDocValues;
 import org.apache.lucene.util.BytesRef;
+import org.elasticsearch.core.Nullable;
 import org.elasticsearch.core.Releasable;
 import org.elasticsearch.search.fetch.StoredFieldsSpec;
 import org.elasticsearch.search.lookup.Source;
@@ -46,6 +47,22 @@ public interface BlockLoader {
         BlockLoader.Block read(BlockFactory factory, Docs docs, int offset) throws IOException;
     }
 
+    /**
+     * An interface for readers that attempt to load all document values in a column-at-a-time fashion.
+     * <p>
+     * Unlike {@link ColumnAtATimeReader}, implementations may return {@code null} if they are unable
+     * to load the requested values, for example due to unsupported underlying data.
+     * This allows callers to optimistically try optimized loading strategies first, and fall back if necessary.
+     */
+    interface OptionalColumnAtATimeReader {
+        /**
+         * Attempts to read the values of all documents in {@code docs}
+         * Returns {@code null} if unable to load the values.
+         */
+        @Nullable
+        BlockLoader.Block tryRead(BlockFactory factory, Docs docs, int offset) throws IOException;
+    }
+
     interface RowStrideReader extends Reader {
         /**
          * Reads the values of the given document into the builder.
@@ -549,6 +566,5 @@ public interface BlockLoader {
         DoubleBuilder sum();
 
         IntBuilder count();
-
     }
 }

+ 156 - 23
server/src/test/java/org/elasticsearch/index/codec/tsdb/es819/ES819TSDBDocValuesFormatTests.java

@@ -24,6 +24,7 @@ import org.apache.lucene.index.IndexWriter;
 import org.apache.lucene.index.IndexWriterConfig;
 import org.apache.lucene.index.IndexableField;
 import org.apache.lucene.index.LeafReader;
+import org.apache.lucene.index.LeafReaderContext;
 import org.apache.lucene.index.LogByteSizeMergePolicy;
 import org.apache.lucene.index.NumericDocValues;
 import org.apache.lucene.index.SortedDocValues;
@@ -31,23 +32,32 @@ import org.apache.lucene.search.IndexSearcher;
 import org.apache.lucene.search.Sort;
 import org.apache.lucene.search.SortField;
 import org.apache.lucene.search.SortedNumericSortField;
+import org.apache.lucene.store.Directory;
 import org.apache.lucene.util.BytesRef;
 import org.elasticsearch.cluster.metadata.DataStream;
 import org.elasticsearch.common.Randomness;
+import org.elasticsearch.common.lucene.BytesRefs;
 import org.elasticsearch.common.util.CollectionUtils;
 import org.elasticsearch.index.codec.Elasticsearch900Lucene101Codec;
 import org.elasticsearch.index.codec.tsdb.ES87TSDBDocValuesFormatTests;
+import org.elasticsearch.index.mapper.BlockLoader;
 import org.elasticsearch.index.mapper.TestBlock;
 import org.elasticsearch.test.ESTestCase;
 
 import java.io.IOException;
 import java.util.ArrayList;
 import java.util.Arrays;
+import java.util.HashMap;
+import java.util.HashSet;
 import java.util.List;
 import java.util.Locale;
+import java.util.Map;
+import java.util.Set;
 import java.util.function.Supplier;
 import java.util.stream.IntStream;
 
+import static org.hamcrest.Matchers.equalTo;
+
 public class ES819TSDBDocValuesFormatTests extends ES87TSDBDocValuesFormatTests {
 
     private final Codec codec = new Elasticsearch900Lucene101Codec() {
@@ -743,9 +753,9 @@ public class ES819TSDBDocValuesFormatTests extends ES87TSDBDocValuesFormatTests
             try (var reader = DirectoryReader.open(iw)) {
                 int gaugeIndex = numDocs;
                 for (var leaf : reader.leaves()) {
-                    var timestampDV = getBulkNumericDocValues(leaf.reader(), timestampField);
-                    var counterDV = getBulkNumericDocValues(leaf.reader(), counterField);
-                    var gaugeDV = getBulkNumericDocValues(leaf.reader(), gaugeField);
+                    var timestampDV = getColumnAtTimeReader(leaf.reader(), timestampField);
+                    var counterDV = getColumnAtTimeReader(leaf.reader(), counterField);
+                    var gaugeDV = getColumnAtTimeReader(leaf.reader(), gaugeField);
                     int maxDoc = leaf.reader().maxDoc();
                     for (int i = 0; i < maxDoc;) {
                         int size = Math.max(1, random().nextInt(0, maxDoc - i));
@@ -753,7 +763,8 @@ public class ES819TSDBDocValuesFormatTests extends ES87TSDBDocValuesFormatTests
 
                         {
                             // bulk loading timestamp:
-                            var block = (TestBlock) timestampDV.read(factory, docs, 0);
+                            var block = (TestBlock) timestampDV.tryRead(factory, docs, 0);
+                            assertNotNull(block);
                             assertEquals(size, block.size());
                             for (int j = 0; j < block.size(); j++) {
                                 long actualTimestamp = (long) block.get(j);
@@ -764,7 +775,8 @@ public class ES819TSDBDocValuesFormatTests extends ES87TSDBDocValuesFormatTests
                         }
                         {
                             // bulk loading counter field:
-                            var block = (TestBlock) counterDV.read(factory, docs, 0);
+                            var block = (TestBlock) counterDV.tryRead(factory, docs, 0);
+                            assertNotNull(block);
                             assertEquals(size, block.size());
                             for (int j = 0; j < block.size(); j++) {
                                 long actualCounter = (long) block.get(j);
@@ -775,7 +787,8 @@ public class ES819TSDBDocValuesFormatTests extends ES87TSDBDocValuesFormatTests
                         }
                         {
                             // bulk loading gauge field:
-                            var block = (TestBlock) gaugeDV.read(factory, docs, 0);
+                            var block = (TestBlock) gaugeDV.tryRead(factory, docs, 0);
+                            assertNotNull(block);
                             assertEquals(size, block.size());
                             for (int j = 0; j < block.size(); j++) {
                                 long actualGauge = (long) block.get(j);
@@ -803,15 +816,16 @@ public class ES819TSDBDocValuesFormatTests extends ES87TSDBDocValuesFormatTests
                 int size = maxDoc - randomOffset;
                 int gaugeIndex = size;
 
-                var timestampDV = getBulkNumericDocValues(leafReader, timestampField);
-                var counterDV = getBulkNumericDocValues(leafReader, counterField);
-                var gaugeDV = getBulkNumericDocValues(leafReader, gaugeField);
+                var timestampDV = getColumnAtTimeReader(leafReader, timestampField);
+                var counterDV = getColumnAtTimeReader(leafReader, counterField);
+                var gaugeDV = getColumnAtTimeReader(leafReader, gaugeField);
 
                 var docs = TestBlock.docs(IntStream.range(0, maxDoc).toArray());
 
                 {
                     // bulk loading timestamp:
-                    var block = (TestBlock) timestampDV.read(blockFactory, docs, randomOffset);
+                    var block = (TestBlock) timestampDV.tryRead(blockFactory, docs, randomOffset);
+                    assertNotNull(block);
                     assertEquals(size, block.size());
                     for (int j = 0; j < block.size(); j++) {
                         long actualTimestamp = (long) block.get(j);
@@ -822,7 +836,8 @@ public class ES819TSDBDocValuesFormatTests extends ES87TSDBDocValuesFormatTests
                 }
                 {
                     // bulk loading counter field:
-                    var block = (TestBlock) counterDV.read(factory, docs, randomOffset);
+                    var block = (TestBlock) counterDV.tryRead(factory, docs, randomOffset);
+                    assertNotNull(block);
                     assertEquals(size, block.size());
                     for (int j = 0; j < block.size(); j++) {
                         long actualCounter = (long) block.get(j);
@@ -833,7 +848,8 @@ public class ES819TSDBDocValuesFormatTests extends ES87TSDBDocValuesFormatTests
                 }
                 {
                     // bulk loading gauge field:
-                    var block = (TestBlock) gaugeDV.read(factory, docs, randomOffset);
+                    var block = (TestBlock) gaugeDV.tryRead(factory, docs, randomOffset);
+                    assertNotNull(block);
                     assertEquals(size, block.size());
                     for (int j = 0; j < block.size(); j++) {
                         long actualGauge = (long) block.get(j);
@@ -847,16 +863,17 @@ public class ES819TSDBDocValuesFormatTests extends ES87TSDBDocValuesFormatTests
                 size = docs.count();
                 // Test against values loaded using normal doc value apis:
                 long[] expectedCounters = new long[size];
-                counterDV = getBulkNumericDocValues(leafReader, counterField);
+                counterDV = getColumnAtTimeReader(leafReader, counterField);
                 for (int i = 0; i < docs.count(); i++) {
                     int docId = docs.get(i);
                     counterDV.advanceExact(docId);
                     expectedCounters[i] = counterDV.longValue();
                 }
-                counterDV = getBulkNumericDocValues(leafReader, counterField);
+                counterDV = getColumnAtTimeReader(leafReader, counterField);
                 {
                     // bulk loading counter field:
-                    var block = (TestBlock) counterDV.read(factory, docs, 0);
+                    var block = (TestBlock) counterDV.tryRead(factory, docs, 0);
+                    assertNotNull(block);
                     assertEquals(size, block.size());
                     for (int j = 0; j < block.size(); j++) {
                         long actualCounter = (long) block.get(j);
@@ -920,9 +937,9 @@ public class ES819TSDBDocValuesFormatTests extends ES87TSDBDocValuesFormatTests
                         false
                     );
                     assertEquals(numDocsPerQValue, topDocs.totalHits.value());
-                    var timestampDV = getBulkNumericDocValues(leafReader, timestampField);
+                    var timestampDV = getColumnAtTimeReader(leafReader, timestampField);
                     long[] expectedTimestamps = new long[numDocsPerQValue];
-                    var counterDV = getBulkNumericDocValues(leafReader, counterField);
+                    var counterDV = getColumnAtTimeReader(leafReader, counterField);
                     long[] expectedCounters = new long[numDocsPerQValue];
                     int[] docIds = new int[numDocsPerQValue];
                     for (int i = 0; i < topDocs.scoreDocs.length; i++) {
@@ -938,8 +955,9 @@ public class ES819TSDBDocValuesFormatTests extends ES87TSDBDocValuesFormatTests
 
                     var docs = TestBlock.docs(docIds);
                     {
-                        timestampDV = getBulkNumericDocValues(leafReader, timestampField);
-                        var block = (TestBlock) timestampDV.read(factory, docs, 0);
+                        timestampDV = getColumnAtTimeReader(leafReader, timestampField);
+                        var block = (TestBlock) timestampDV.tryRead(factory, docs, 0);
+                        assertNotNull(block);
                         assertEquals(numDocsPerQValue, block.size());
                         for (int j = 0; j < block.size(); j++) {
                             long actualTimestamp = (long) block.get(j);
@@ -948,8 +966,9 @@ public class ES819TSDBDocValuesFormatTests extends ES87TSDBDocValuesFormatTests
                         }
                     }
                     {
-                        counterDV = getBulkNumericDocValues(leafReader, counterField);
-                        var block = (TestBlock) counterDV.read(factory, docs, 0);
+                        counterDV = getColumnAtTimeReader(leafReader, counterField);
+                        var block = (TestBlock) counterDV.tryRead(factory, docs, 0);
+                        assertNotNull(block);
                         assertEquals(numDocsPerQValue, block.size());
                         for (int j = 0; j < block.size(); j++) {
                             long actualCounter = (long) block.get(j);
@@ -962,8 +981,122 @@ public class ES819TSDBDocValuesFormatTests extends ES87TSDBDocValuesFormatTests
         }
     }
 
-    private static BulkNumericDocValues getBulkNumericDocValues(LeafReader leafReader, String counterField) throws IOException {
-        return (BulkNumericDocValues) DocValues.unwrapSingleton(leafReader.getSortedNumericDocValues(counterField));
+    public void testLoadKeywordFieldWithIndexSorts() throws IOException {
+        String primaryField = "sorted_first";
+        String secondField = "sorted_second";
+        String unsortedField = "no_sort";
+        String sparseField = "sparse";
+        var config = new IndexWriterConfig();
+        config.setIndexSort(new Sort(new SortField(primaryField, SortField.Type.STRING, false)));
+        config.setMergePolicy(new LogByteSizeMergePolicy());
+        config.setCodec(getCodec());
+        Map<Integer, String> hostnames = new HashMap<>();
+        try (Directory dir = newDirectory(); IndexWriter writer = new IndexWriter(dir, config)) {
+            int numDocs = ESTestCase.randomIntBetween(100, 5000);
+            for (int i = 0; i < numDocs; i++) {
+                hostnames.put(i, "h" + random().nextInt(10));
+            }
+            List<Integer> ids = new ArrayList<>(hostnames.keySet());
+            Randomness.shuffle(ids);
+            Set<Integer> sparseIds = new HashSet<>(ESTestCase.randomSubsetOf(ESTestCase.between(1, ids.size() / 2), ids));
+            for (Integer id : ids) {
+                var d = new Document();
+                String hostname = hostnames.get(id);
+                d.add(new NumericDocValuesField("id", id));
+                d.add(new SortedDocValuesField(primaryField, new BytesRef(hostname)));
+                d.add(new SortedDocValuesField(secondField, new BytesRef(hostname)));
+                d.add(new SortedDocValuesField(unsortedField, new BytesRef(hostname)));
+                if (sparseIds.contains(id)) {
+                    d.add(new SortedDocValuesField(sparseField, new BytesRef(hostname)));
+                }
+                writer.addDocument(d);
+                if (random().nextInt(100) < 10) {
+                    writer.flush();
+                }
+            }
+            for (int iter = 0; iter < 2; iter++) {
+                var factory = TestBlock.factory();
+                try (DirectoryReader reader = DirectoryReader.open(writer)) {
+                    for (LeafReaderContext leaf : reader.leaves()) {
+                        BlockLoader.Docs docs = new BlockLoader.Docs() {
+                            @Override
+                            public int count() {
+                                return leaf.reader().maxDoc();
+                            }
+
+                            @Override
+                            public int get(int i) {
+                                return i;
+                            }
+                        };
+                        var idReader = ESTestCase.asInstanceOf(
+                            BlockLoader.OptionalColumnAtATimeReader.class,
+                            leaf.reader().getNumericDocValues("id")
+                        );
+                        TestBlock idBlock = (TestBlock) idReader.tryRead(factory, docs, 0);
+                        assertNotNull(idBlock);
+                        var reader2 = ESTestCase.asInstanceOf(
+                            BlockLoader.OptionalColumnAtATimeReader.class,
+                            leaf.reader().getSortedDocValues(secondField)
+                        );
+                        assertNull(reader2.tryRead(factory, docs, 0));
+                        var reader3 = ESTestCase.asInstanceOf(
+                            BlockLoader.OptionalColumnAtATimeReader.class,
+                            leaf.reader().getSortedDocValues(unsortedField)
+                        );
+                        assertNull(reader3.tryRead(factory, docs, 0));
+                        for (int offset = 0; offset < idBlock.size(); offset += ESTestCase.between(1, numDocs)) {
+                            int start = offset;
+                            var reader1 = ESTestCase.asInstanceOf(
+                                BlockLoader.OptionalColumnAtATimeReader.class,
+                                leaf.reader().getSortedDocValues(primaryField)
+                            );
+                            while (start < idBlock.size()) {
+                                int end = start + random().nextInt(idBlock.size() - start);
+                                TestBlock hostBlock = (TestBlock) reader1.tryRead(factory, new BlockLoader.Docs() {
+                                    @Override
+                                    public int count() {
+                                        return end + 1;
+                                    }
+
+                                    @Override
+                                    public int get(int docId) {
+                                        return docId;
+                                    }
+                                }, start);
+                                Set<String> seenValues = new HashSet<>();
+                                for (int p = start; p <= end; p++) {
+                                    String hostName = hostnames.get(((Number) idBlock.get(p)).intValue());
+                                    seenValues.add(hostName);
+                                }
+                                if (seenValues.size() == 1) {
+                                    assertNotNull(hostBlock);
+                                    assertThat(hostBlock.size(), equalTo(end - start + 1));
+                                    for (int i = 0; i < hostBlock.size(); i++) {
+                                        String actualHostName = BytesRefs.toString(hostBlock.get(i));
+                                        assertThat(actualHostName, equalTo(hostnames.get(((Number) idBlock.get(i + start)).intValue())));
+                                    }
+                                } else {
+                                    assertNull(hostBlock);
+                                }
+                                if (start == idBlock.size() - 1) {
+                                    break;
+                                }
+                                start = end + ESTestCase.between(0, 10);
+                            }
+                        }
+                        writer.forceMerge(1);
+                    }
+                }
+            }
+        }
+    }
+
+    private static ES819TSDBDocValuesProducer.BaseDenseNumericValues getColumnAtTimeReader(LeafReader leafReader, String counterField)
+        throws IOException {
+        return (ES819TSDBDocValuesProducer.BaseDenseNumericValues) DocValues.unwrapSingleton(
+            leafReader.getSortedNumericDocValues(counterField)
+        );
     }
 
     private IndexWriterConfig getTimeSeriesIndexWriterConfig(String hostnameField, String timestampField) {

+ 4 - 5
server/src/test/java/org/elasticsearch/index/mapper/DateFieldMapperTests.java

@@ -31,7 +31,6 @@ import org.elasticsearch.core.Strings;
 import org.elasticsearch.index.IndexSettings;
 import org.elasticsearch.index.IndexVersion;
 import org.elasticsearch.index.IndexVersions;
-import org.elasticsearch.index.codec.tsdb.es819.BulkNumericDocValues;
 import org.elasticsearch.index.codec.tsdb.es819.ES819TSDBDocValuesFormat;
 import org.elasticsearch.index.mapper.DateFieldMapper.DateFieldType;
 import org.elasticsearch.script.DateFieldScript;
@@ -849,7 +848,7 @@ public class DateFieldMapperTests extends MapperTestCase {
                 {
                     // One big doc block
                     var columnReader = (BlockDocValuesReader.SingletonLongs) blockLoader.columnAtATimeReader(context);
-                    assertThat(columnReader.numericDocValues, instanceOf(BulkNumericDocValues.class));
+                    assertThat(columnReader.numericDocValues, instanceOf(BlockLoader.OptionalColumnAtATimeReader.class));
                     var docBlock = TestBlock.docs(IntStream.range(from, to).toArray());
                     var block = (TestBlock) columnReader.read(TestBlock.factory(), docBlock, 0);
                     assertThat(block.size(), equalTo(to - from));
@@ -861,7 +860,7 @@ public class DateFieldMapperTests extends MapperTestCase {
                     // Smaller doc blocks
                     int docBlockSize = 1000;
                     var columnReader = (BlockDocValuesReader.SingletonLongs) blockLoader.columnAtATimeReader(context);
-                    assertThat(columnReader.numericDocValues, instanceOf(BulkNumericDocValues.class));
+                    assertThat(columnReader.numericDocValues, instanceOf(BlockLoader.OptionalColumnAtATimeReader.class));
                     for (int i = from; i < to; i += docBlockSize) {
                         var docBlock = TestBlock.docs(IntStream.range(i, i + docBlockSize).toArray());
                         var block = (TestBlock) columnReader.read(TestBlock.factory(), docBlock, 0);
@@ -875,7 +874,7 @@ public class DateFieldMapperTests extends MapperTestCase {
                 {
                     // One smaller doc block:
                     var columnReader = (BlockDocValuesReader.SingletonLongs) blockLoader.columnAtATimeReader(context);
-                    assertThat(columnReader.numericDocValues, instanceOf(BulkNumericDocValues.class));
+                    assertThat(columnReader.numericDocValues, instanceOf(BlockLoader.OptionalColumnAtATimeReader.class));
                     var docBlock = TestBlock.docs(IntStream.range(1010, 2020).toArray());
                     var block = (TestBlock) columnReader.read(TestBlock.factory(), docBlock, 0);
                     assertThat(block.size(), equalTo(1010));
@@ -887,7 +886,7 @@ public class DateFieldMapperTests extends MapperTestCase {
                 {
                     // Read two tiny blocks:
                     var columnReader = (BlockDocValuesReader.SingletonLongs) blockLoader.columnAtATimeReader(context);
-                    assertThat(columnReader.numericDocValues, instanceOf(BulkNumericDocValues.class));
+                    assertThat(columnReader.numericDocValues, instanceOf(BlockLoader.OptionalColumnAtATimeReader.class));
                     var docBlock = TestBlock.docs(IntStream.range(32, 64).toArray());
                     var block = (TestBlock) columnReader.read(TestBlock.factory(), docBlock, 0);
                     assertThat(block.size(), equalTo(32));

+ 2 - 3
test/framework/src/main/java/org/elasticsearch/index/mapper/MapperTestCase.java

@@ -43,7 +43,6 @@ import org.elasticsearch.core.CheckedConsumer;
 import org.elasticsearch.index.IndexSettings;
 import org.elasticsearch.index.IndexVersion;
 import org.elasticsearch.index.IndexVersions;
-import org.elasticsearch.index.codec.tsdb.es819.BulkNumericDocValues;
 import org.elasticsearch.index.engine.Engine;
 import org.elasticsearch.index.engine.LuceneSyntheticSourceChangesSnapshot;
 import org.elasticsearch.index.fielddata.FieldDataContext;
@@ -1541,7 +1540,7 @@ public abstract class MapperTestCase extends MapperServiceTestCase {
                 LeafReaderContext context = reader.leaves().get(0);
                 var blockLoader = mapperService.fieldType("field").blockLoader(mockBlockContext);
                 var columnReader = (BlockDocValuesReader.SingletonLongs) blockLoader.columnAtATimeReader(context);
-                assertThat(columnReader.numericDocValues, instanceOf(BulkNumericDocValues.class));
+                assertThat(columnReader.numericDocValues, instanceOf(BlockLoader.OptionalColumnAtATimeReader.class));
                 var docBlock = TestBlock.docs(IntStream.range(0, 3).toArray());
                 var block = (TestBlock) columnReader.read(TestBlock.factory(), docBlock, 0);
                 for (int i = 0; i < block.size(); i++) {
@@ -1566,7 +1565,7 @@ public abstract class MapperTestCase extends MapperServiceTestCase {
                 LeafReaderContext context = reader.leaves().get(0);
                 var blockLoader = mapperService.fieldType("field").blockLoader(mockBlockContext);
                 var columnReader = (BlockDocValuesReader.SingletonLongs) blockLoader.columnAtATimeReader(context);
-                assertThat(columnReader.numericDocValues, not(instanceOf(BulkNumericDocValues.class)));
+                assertThat(columnReader.numericDocValues, not(instanceOf(BlockLoader.OptionalColumnAtATimeReader.class)));
                 var docBlock = TestBlock.docs(IntStream.range(0, 3).toArray());
                 var block = (TestBlock) columnReader.read(TestBlock.factory(), docBlock, 0);
                 assertThat(block.get(0), equalTo(expectedSampleValues[0]));

+ 0 - 7
x-pack/plugin/esql/compute/src/main/java/org/elasticsearch/compute/lucene/read/ComputeBlockLoaderFactory.java

@@ -7,10 +7,8 @@
 
 package org.elasticsearch.compute.lucene.read;
 
-import org.apache.lucene.util.BytesRef;
 import org.elasticsearch.compute.data.Block;
 import org.elasticsearch.compute.data.BlockFactory;
-import org.elasticsearch.compute.data.BytesRefBlock;
 import org.elasticsearch.core.Releasable;
 
 class ComputeBlockLoaderFactory extends DelegatingBlockLoaderFactory implements Releasable {
@@ -35,9 +33,4 @@ class ComputeBlockLoaderFactory extends DelegatingBlockLoaderFactory implements
             nullBlock.close();
         }
     }
-
-    @Override
-    public BytesRefBlock constantBytes(BytesRef value, int count) {
-        return factory.newConstantBytesRefBlockWith(value, count);
-    }
 }

+ 27 - 0
x-pack/plugin/esql/compute/src/main/java/org/elasticsearch/compute/lucene/read/DelegatingBlockLoaderFactory.java

@@ -9,9 +9,15 @@ package org.elasticsearch.compute.lucene.read;
 
 import org.apache.lucene.index.SortedDocValues;
 import org.apache.lucene.index.SortedSetDocValues;
+import org.apache.lucene.util.BytesRef;
 import org.elasticsearch.compute.data.Block;
 import org.elasticsearch.compute.data.BlockFactory;
+import org.elasticsearch.compute.data.BytesRefBlock;
+import org.elasticsearch.compute.data.BytesRefVector;
 import org.elasticsearch.compute.data.ElementType;
+import org.elasticsearch.compute.data.IntVector;
+import org.elasticsearch.compute.data.OrdinalBytesRefVector;
+import org.elasticsearch.core.Releasables;
 import org.elasticsearch.index.mapper.BlockLoader;
 
 public abstract class DelegatingBlockLoaderFactory implements BlockLoader.BlockFactory {
@@ -41,6 +47,27 @@ public abstract class DelegatingBlockLoaderFactory implements BlockLoader.BlockF
         return factory.newBytesRefBlockBuilder(expectedCount);
     }
 
+    @Override
+    public BytesRefBlock constantBytes(BytesRef value, int count) {
+        if (count == 1) {
+            return factory.newConstantBytesRefBlockWith(value, count);
+        }
+        BytesRefVector dict = null;
+        IntVector ordinals = null;
+        boolean success = false;
+        try {
+            dict = factory.newConstantBytesRefVector(value, 1);
+            ordinals = factory.newConstantIntVector(0, count);
+            var result = new OrdinalBytesRefVector(ordinals, dict).asBlock();
+            success = true;
+            return result;
+        } finally {
+            if (success == false) {
+                Releasables.closeExpectNoException(dict, ordinals);
+            }
+        }
+    }
+
     @Override
     public BlockLoader.DoubleBuilder doublesFromDocValues(int expectedCount) {
         return factory.newDoubleBlockBuilder(expectedCount).mvOrdering(Block.MvOrdering.SORTED_ASCENDING);

+ 0 - 6
x-pack/plugin/esql/compute/src/main/java/org/elasticsearch/compute/lucene/read/TimeSeriesExtractFieldOperator.java

@@ -10,7 +10,6 @@ package org.elasticsearch.compute.lucene.read;
 import org.apache.lucene.index.IndexReader;
 import org.apache.lucene.index.LeafReaderContext;
 import org.apache.lucene.index.SortedDocValues;
-import org.apache.lucene.util.BytesRef;
 import org.elasticsearch.compute.data.Block;
 import org.elasticsearch.compute.data.BlockFactory;
 import org.elasticsearch.compute.data.BytesRefBlock;
@@ -202,11 +201,6 @@ public class TimeSeriesExtractFieldOperator extends AbstractPageMappingOperator
             throw new UnsupportedOperationException("must not be used by column readers");
         }
 
-        @Override
-        public BlockLoader.Block constantBytes(BytesRef value, int count) {
-            throw new UnsupportedOperationException("must not be used by column readers");
-        }
-
         @Override
         public BlockLoader.SingletonOrdinalsBuilder singletonOrdinalsBuilder(SortedDocValues ordinals, int count) {
             throw new UnsupportedOperationException("must not be used by column readers");