Selaa lähdekoodia

Introduce FallbackSyntheticSourceBlockLoader and apply it to keyword fields (#119546)

Oleksandr Kolomiiets 8 kuukautta sitten
vanhempi
commit
e885da1e94

+ 5 - 0
docs/changelog/119546.yaml

@@ -0,0 +1,5 @@
+pr: 119546
+summary: Introduce `FallbackSyntheticSourceBlockLoader` and apply it to keyword fields
+area: Mapping
+type: enhancement
+issues: []

+ 270 - 0
server/src/main/java/org/elasticsearch/index/mapper/FallbackSyntheticSourceBlockLoader.java

@@ -0,0 +1,270 @@
+/*
+ * Copyright Elasticsearch B.V. and/or licensed to Elasticsearch B.V. under one
+ * or more contributor license agreements. Licensed under the "Elastic License
+ * 2.0", the "GNU Affero General Public License v3.0 only", and the "Server Side
+ * Public License v 1"; you may not use this file except in compliance with, at
+ * your election, the "Elastic License 2.0", the "GNU Affero General Public
+ * License v3.0 only", or the "Server Side Public License, v 1".
+ */
+
+package org.elasticsearch.index.mapper;
+
+import org.apache.lucene.index.LeafReaderContext;
+import org.apache.lucene.index.SortedSetDocValues;
+import org.elasticsearch.search.fetch.StoredFieldsSpec;
+import org.elasticsearch.xcontent.XContentParser;
+import org.elasticsearch.xcontent.XContentParserConfiguration;
+
+import java.io.IOException;
+import java.util.ArrayList;
+import java.util.HashMap;
+import java.util.HashSet;
+import java.util.List;
+import java.util.Map;
+import java.util.Optional;
+import java.util.Set;
+
+/**
+ * Block loader for fields that use fallback synthetic source implementation.
+ * <br>
+ * Usually fields have doc_values or stored fields and block loaders use them directly. In some cases neither is available
+ * and we would fall back to (potentially synthetic) _source. However, in case of synthetic source, there is actually no need to
+ * construct the entire _source. We know that there is no doc_values and stored fields, and therefore we will be using fallback synthetic
+ * source. That is equivalent to just reading _ignored_source stored field directly and doing an in-place synthetic source just
+ * for this field.
+ * <br>
+ * See {@link IgnoredSourceFieldMapper}.
+ */
+public abstract class FallbackSyntheticSourceBlockLoader implements BlockLoader {
+    private final Reader<?> reader;
+    private final String fieldName;
+
+    protected FallbackSyntheticSourceBlockLoader(Reader<?> reader, String fieldName) {
+        this.reader = reader;
+        this.fieldName = fieldName;
+    }
+
+    @Override
+    public ColumnAtATimeReader columnAtATimeReader(LeafReaderContext context) throws IOException {
+        return null;
+    }
+
+    @Override
+    public RowStrideReader rowStrideReader(LeafReaderContext context) throws IOException {
+        return new IgnoredSourceRowStrideReader<>(fieldName, reader);
+    }
+
+    @Override
+    public StoredFieldsSpec rowStrideStoredFieldSpec() {
+        return new StoredFieldsSpec(false, false, Set.of(IgnoredSourceFieldMapper.NAME));
+    }
+
+    @Override
+    public boolean supportsOrdinals() {
+        return false;
+    }
+
+    @Override
+    public SortedSetDocValues ordinals(LeafReaderContext context) throws IOException {
+        throw new UnsupportedOperationException();
+    }
+
+    private record IgnoredSourceRowStrideReader<T>(String fieldName, Reader<T> reader) implements RowStrideReader {
+        @Override
+        public void read(int docId, StoredFields storedFields, Builder builder) throws IOException {
+            var ignoredSource = storedFields.storedFields().get(IgnoredSourceFieldMapper.NAME);
+            if (ignoredSource == null) {
+                return;
+            }
+
+            Map<String, List<IgnoredSourceFieldMapper.NameValue>> valuesForFieldAndParents = new HashMap<>();
+
+            // Contains name of the field and all its parents
+            Set<String> fieldNames = new HashSet<>() {
+                {
+                    add("_doc");
+                }
+            };
+
+            var current = new StringBuilder();
+            for (String part : fieldName.split("\\.")) {
+                if (current.isEmpty() == false) {
+                    current.append('.');
+                }
+                current.append(part);
+                fieldNames.add(current.toString());
+            }
+
+            for (Object value : ignoredSource) {
+                IgnoredSourceFieldMapper.NameValue nameValue = IgnoredSourceFieldMapper.decode(value);
+                if (fieldNames.contains(nameValue.name())) {
+                    valuesForFieldAndParents.computeIfAbsent(nameValue.name(), k -> new ArrayList<>()).add(nameValue);
+                }
+            }
+
+            // TODO figure out how to handle XContentDataHelper#voidValue()
+
+            var blockValues = new ArrayList<T>();
+
+            var leafFieldValue = valuesForFieldAndParents.get(fieldName);
+            if (leafFieldValue != null) {
+                readFromFieldValue(leafFieldValue, blockValues);
+            } else {
+                readFromParentValue(valuesForFieldAndParents, blockValues);
+            }
+
+            if (blockValues.isEmpty() == false) {
+                if (blockValues.size() > 1) {
+                    builder.beginPositionEntry();
+                }
+
+                reader.writeToBlock(blockValues, builder);
+
+                if (blockValues.size() > 1) {
+                    builder.endPositionEntry();
+                }
+            } else {
+                builder.appendNull();
+            }
+        }
+
+        private void readFromFieldValue(List<IgnoredSourceFieldMapper.NameValue> nameValues, List<T> blockValues) throws IOException {
+            if (nameValues.isEmpty()) {
+                return;
+            }
+
+            for (var nameValue : nameValues) {
+                // Leaf field is stored directly (not as a part of a parent object), let's try to decode it.
+                Optional<Object> singleValue = XContentDataHelper.decode(nameValue.value());
+                if (singleValue.isPresent()) {
+                    reader.convertValue(singleValue.get(), blockValues);
+                    continue;
+                }
+
+                // We have a value for this field but it's an array or an object
+                var type = XContentDataHelper.decodeType(nameValue.value());
+                assert type.isPresent();
+
+                try (
+                    XContentParser parser = type.get()
+                        .xContent()
+                        .createParser(
+                            XContentParserConfiguration.EMPTY,
+                            nameValue.value().bytes,
+                            nameValue.value().offset + 1,
+                            nameValue.value().length - 1
+                        )
+                ) {
+                    parser.nextToken();
+                    parseWithReader(parser, blockValues);
+                }
+            }
+        }
+
+        private void readFromParentValue(
+            Map<String, List<IgnoredSourceFieldMapper.NameValue>> valuesForFieldAndParents,
+            List<T> blockValues
+        ) throws IOException {
+            if (valuesForFieldAndParents.isEmpty()) {
+                return;
+            }
+
+            // If a parent object is stored at a particular level its children won't be stored.
+            // So we should only ever have one parent here.
+            assert valuesForFieldAndParents.size() == 1 : "_ignored_source field contains multiple levels of the same object";
+            var parentValues = valuesForFieldAndParents.values().iterator().next();
+
+            for (var nameValue : parentValues) {
+                parseFieldFromParent(nameValue, blockValues);
+            }
+        }
+
+        private void parseFieldFromParent(IgnoredSourceFieldMapper.NameValue nameValue, List<T> blockValues) throws IOException {
+            var type = XContentDataHelper.decodeType(nameValue.value());
+            assert type.isPresent();
+
+            String nameAtThisLevel = fieldName.substring(nameValue.name().length() + 1);
+            var filterParserConfig = XContentParserConfiguration.EMPTY.withFiltering(null, Set.of(nameAtThisLevel), Set.of(), true);
+            try (
+                XContentParser parser = type.get()
+                    .xContent()
+                    .createParser(filterParserConfig, nameValue.value().bytes, nameValue.value().offset + 1, nameValue.value().length - 1)
+            ) {
+                parser.nextToken();
+                var fieldNameInParser = new StringBuilder(nameValue.name());
+                while (true) {
+                    if (parser.currentToken() == XContentParser.Token.FIELD_NAME) {
+                        fieldNameInParser.append('.').append(parser.currentName());
+                        if (fieldNameInParser.toString().equals(fieldName)) {
+                            parser.nextToken();
+                            break;
+                        }
+                    }
+                    parser.nextToken();
+                }
+                parseWithReader(parser, blockValues);
+            }
+        }
+
+        private void parseWithReader(XContentParser parser, List<T> blockValues) throws IOException {
+            if (parser.currentToken() == XContentParser.Token.START_ARRAY) {
+                while (parser.nextToken() != XContentParser.Token.END_ARRAY) {
+                    reader.parse(parser, blockValues);
+                }
+                return;
+            }
+
+            reader.parse(parser, blockValues);
+        }
+
+        @Override
+        public boolean canReuse(int startingDocID) {
+            return true;
+        }
+    }
+
+    /**
+     * Field-specific implementation that converts data stored in _ignored_source field to block loader values.
+     * @param <T>
+     */
+    public interface Reader<T> {
+        /**
+         * Converts a raw stored value for this field to a value in a format suitable for block loader and adds it to the provided
+         * accumulator.
+         * @param value raw decoded value from _ignored_source field (synthetic _source value)
+         * @param accumulator list containing the result of conversion
+         */
+        void convertValue(Object value, List<T> accumulator);
+
+        /**
+         * Parses one or more complex values using a provided parser and adds them to the provided accumulator.
+         * @param parser parser of a value from _ignored_source field (synthetic _source value)
+         * @param accumulator list containing the results of parsing
+         */
+        void parse(XContentParser parser, List<T> accumulator) throws IOException;
+
+        void writeToBlock(List<T> values, Builder blockBuilder);
+    }
+
+    public abstract static class ReaderWithNullValueSupport<T> implements Reader<T> {
+        private final T nullValue;
+
+        public ReaderWithNullValueSupport(T nullValue) {
+            this.nullValue = nullValue;
+        }
+
+        @Override
+        public void parse(XContentParser parser, List<T> accumulator) throws IOException {
+            if (parser.currentToken() == XContentParser.Token.VALUE_NULL) {
+                if (nullValue != null) {
+                    convertValue(nullValue, accumulator);
+                }
+                return;
+            }
+
+            parseNonNullValue(parser, accumulator);
+        }
+
+        abstract void parseNonNullValue(XContentParser parser, List<T> accumulator) throws IOException;
+    }
+}

+ 55 - 5
server/src/main/java/org/elasticsearch/index/mapper/KeywordFieldMapper.java

@@ -67,6 +67,7 @@ import org.elasticsearch.search.runtime.StringScriptFieldRegexpQuery;
 import org.elasticsearch.search.runtime.StringScriptFieldTermQuery;
 import org.elasticsearch.search.runtime.StringScriptFieldWildcardQuery;
 import org.elasticsearch.xcontent.XContentBuilder;
+import org.elasticsearch.xcontent.XContentParser;
 
 import java.io.IOException;
 import java.io.UncheckedIOException;
@@ -74,6 +75,7 @@ import java.util.ArrayList;
 import java.util.Arrays;
 import java.util.Collection;
 import java.util.Collections;
+import java.util.List;
 import java.util.Locale;
 import java.util.Map;
 import java.util.Objects;
@@ -738,10 +740,54 @@ public final class KeywordFieldMapper extends FieldMapper {
             if (isStored()) {
                 return new BlockStoredFieldsReader.BytesFromBytesRefsBlockLoader(name());
             }
+
+            if (isSyntheticSource) {
+                return new FallbackSyntheticSourceBlockLoader(fallbackSyntheticSourceBlockLoaderReader(), name()) {
+                    @Override
+                    public Builder builder(BlockFactory factory, int expectedCount) {
+                        return factory.bytesRefs(expectedCount);
+                    }
+                };
+            }
+
             SourceValueFetcher fetcher = sourceValueFetcher(blContext.sourcePaths(name()));
             return new BlockSourceReader.BytesRefsBlockLoader(fetcher, sourceBlockLoaderLookup(blContext));
         }
 
+        private FallbackSyntheticSourceBlockLoader.Reader<?> fallbackSyntheticSourceBlockLoaderReader() {
+            var nullValueBytes = nullValue != null ? new BytesRef(nullValue) : null;
+            return new FallbackSyntheticSourceBlockLoader.ReaderWithNullValueSupport<>(nullValueBytes) {
+                @Override
+                public void convertValue(Object value, List<BytesRef> accumulator) {
+                    String stringValue = ((BytesRef) value).utf8ToString();
+                    String adjusted = applyIgnoreAboveAndNormalizer(stringValue);
+                    if (adjusted != null) {
+                        // TODO what if the value didn't change?
+                        accumulator.add(new BytesRef(adjusted));
+                    }
+                }
+
+                @Override
+                public void parseNonNullValue(XContentParser parser, List<BytesRef> accumulator) throws IOException {
+                    assert parser.currentToken() == XContentParser.Token.VALUE_STRING : "Unexpected token " + parser.currentToken();
+
+                    var value = applyIgnoreAboveAndNormalizer(parser.text());
+                    if (value != null) {
+                        accumulator.add(new BytesRef(value));
+                    }
+                }
+
+                @Override
+                public void writeToBlock(List<BytesRef> values, BlockLoader.Builder blockBuilder) {
+                    var bytesRefBuilder = (BlockLoader.BytesRefBuilder) blockBuilder;
+
+                    for (var value : values) {
+                        bytesRefBuilder.appendBytesRef(value);
+                    }
+                }
+            };
+        }
+
         private BlockSourceReader.LeafIteratorLookup sourceBlockLoaderLookup(BlockLoaderContext blContext) {
             if (getTextSearchInfo().hasNorms()) {
                 return BlockSourceReader.lookupFromNorms(name());
@@ -821,15 +867,19 @@ public final class KeywordFieldMapper extends FieldMapper {
                 @Override
                 protected String parseSourceValue(Object value) {
                     String keywordValue = value.toString();
-                    if (keywordValue.length() > ignoreAbove) {
-                        return null;
-                    }
-
-                    return normalizeValue(normalizer(), name(), keywordValue);
+                    return applyIgnoreAboveAndNormalizer(keywordValue);
                 }
             };
         }
 
+        private String applyIgnoreAboveAndNormalizer(String value) {
+            if (value.length() > ignoreAbove) {
+                return null;
+            }
+
+            return normalizeValue(normalizer(), name(), value);
+        }
+
         @Override
         public Object valueForDisplay(Object value) {
             if (value == null) {

+ 92 - 0
server/src/main/java/org/elasticsearch/index/mapper/XContentDataHelper.java

@@ -110,6 +110,28 @@ public final class XContentDataHelper {
         }
     }
 
+    /**
+     * Decode the value in the passed {@link BytesRef} in place and return it.
+     * Returns {@link Optional#empty()} for complex values (objects and arrays).
+     */
+    static Optional<Object> decode(BytesRef r) {
+        return switch ((char) r.bytes[r.offset]) {
+            case BINARY_ENCODING -> Optional.of(TypeUtils.EMBEDDED_OBJECT.decode(r));
+            case CBOR_OBJECT_ENCODING, JSON_OBJECT_ENCODING, YAML_OBJECT_ENCODING, SMILE_OBJECT_ENCODING -> Optional.empty();
+            case BIG_DECIMAL_ENCODING -> Optional.of(TypeUtils.BIG_DECIMAL.decode(r));
+            case FALSE_ENCODING, TRUE_ENCODING -> Optional.of(TypeUtils.BOOLEAN.decode(r));
+            case BIG_INTEGER_ENCODING -> Optional.of(TypeUtils.BIG_INTEGER.decode(r));
+            case STRING_ENCODING -> Optional.of(TypeUtils.STRING.decode(r));
+            case INTEGER_ENCODING -> Optional.of(TypeUtils.INTEGER.decode(r));
+            case LONG_ENCODING -> Optional.of(TypeUtils.LONG.decode(r));
+            case DOUBLE_ENCODING -> Optional.of(TypeUtils.DOUBLE.decode(r));
+            case FLOAT_ENCODING -> Optional.of(TypeUtils.FLOAT.decode(r));
+            case NULL_ENCODING -> Optional.ofNullable(TypeUtils.NULL.decode(r));
+            case VOID_ENCODING -> Optional.of(TypeUtils.VOID.decode(r));
+            default -> throw new IllegalArgumentException("Can't decode " + r);
+        };
+    }
+
     /**
      * Determines if the given {@link BytesRef}, encoded with {@link XContentDataHelper#encodeToken(XContentParser)},
      * is an encoded object.
@@ -339,6 +361,11 @@ public final class XContentDataHelper {
                 return bytes;
             }
 
+            @Override
+            Object decode(BytesRef r) {
+                return new BytesRef(r.bytes, r.offset + 1, r.length - 1);
+            }
+
             @Override
             void decodeAndWrite(XContentBuilder b, BytesRef r) throws IOException {
                 b.value(new BytesRef(r.bytes, r.offset + 1, r.length - 1).utf8ToString());
@@ -359,6 +386,11 @@ public final class XContentDataHelper {
                 return bytes;
             }
 
+            @Override
+            Object decode(BytesRef r) {
+                return ByteUtils.readIntLE(r.bytes, 1 + r.offset);
+            }
+
             @Override
             void decodeAndWrite(XContentBuilder b, BytesRef r) throws IOException {
                 b.value(ByteUtils.readIntLE(r.bytes, 1 + r.offset));
@@ -379,6 +411,11 @@ public final class XContentDataHelper {
                 return bytes;
             }
 
+            @Override
+            Object decode(BytesRef r) {
+                return ByteUtils.readLongLE(r.bytes, 1 + r.offset);
+            }
+
             @Override
             void decodeAndWrite(XContentBuilder b, BytesRef r) throws IOException {
                 b.value(ByteUtils.readLongLE(r.bytes, 1 + r.offset));
@@ -399,6 +436,11 @@ public final class XContentDataHelper {
                 return bytes;
             }
 
+            @Override
+            Object decode(BytesRef r) {
+                return ByteUtils.readDoubleLE(r.bytes, 1 + r.offset);
+            }
+
             @Override
             void decodeAndWrite(XContentBuilder b, BytesRef r) throws IOException {
                 b.value(ByteUtils.readDoubleLE(r.bytes, 1 + r.offset));
@@ -419,6 +461,11 @@ public final class XContentDataHelper {
                 return bytes;
             }
 
+            @Override
+            Object decode(BytesRef r) {
+                return ByteUtils.readFloatLE(r.bytes, 1 + r.offset);
+            }
+
             @Override
             void decodeAndWrite(XContentBuilder b, BytesRef r) throws IOException {
                 b.value(ByteUtils.readFloatLE(r.bytes, 1 + r.offset));
@@ -437,6 +484,11 @@ public final class XContentDataHelper {
                 return bytes;
             }
 
+            @Override
+            Object decode(BytesRef r) {
+                return new BigInteger(r.bytes, r.offset + 1, r.length - 1);
+            }
+
             @Override
             void decodeAndWrite(XContentBuilder b, BytesRef r) throws IOException {
                 b.value(new BigInteger(r.bytes, r.offset + 1, r.length - 1));
@@ -455,6 +507,15 @@ public final class XContentDataHelper {
                 return bytes;
             }
 
+            @Override
+            Object decode(BytesRef r) {
+                if (r.length < 5) {
+                    throw new IllegalArgumentException("Can't decode " + r);
+                }
+                int scale = ByteUtils.readIntLE(r.bytes, r.offset + 1);
+                return new BigDecimal(new BigInteger(r.bytes, r.offset + 5, r.length - 5), scale);
+            }
+
             @Override
             void decodeAndWrite(XContentBuilder b, BytesRef r) throws IOException {
                 if (r.length < 5) {
@@ -477,6 +538,15 @@ public final class XContentDataHelper {
                 return bytes;
             }
 
+            @Override
+            Object decode(BytesRef r) {
+                if (r.length != 1) {
+                    throw new IllegalArgumentException("Can't decode " + r);
+                }
+                assert r.bytes[r.offset] == 't' || r.bytes[r.offset] == 'f' : r.bytes[r.offset];
+                return r.bytes[r.offset] == 't';
+            }
+
             @Override
             void decodeAndWrite(XContentBuilder b, BytesRef r) throws IOException {
                 if (r.length != 1) {
@@ -499,6 +569,11 @@ public final class XContentDataHelper {
                 return bytes;
             }
 
+            @Override
+            Object decode(BytesRef r) {
+                return null;
+            }
+
             @Override
             void decodeAndWrite(XContentBuilder b, BytesRef r) throws IOException {
                 b.nullValue();
@@ -517,6 +592,11 @@ public final class XContentDataHelper {
                 return bytes;
             }
 
+            @Override
+            Object decode(BytesRef r) {
+                return new BytesRef(r.bytes, r.offset + 1, r.length - 1);
+            }
+
             @Override
             void decodeAndWrite(XContentBuilder b, BytesRef r) throws IOException {
                 b.value(r.bytes, r.offset + 1, r.length - 1);
@@ -538,6 +618,11 @@ public final class XContentDataHelper {
                 }
             }
 
+            @Override
+            Object decode(BytesRef r) {
+                throw new UnsupportedOperationException();
+            }
+
             @Override
             void decodeAndWrite(XContentBuilder b, BytesRef r) throws IOException {
                 switch ((char) r.bytes[r.offset]) {
@@ -562,6 +647,11 @@ public final class XContentDataHelper {
                 return bytes;
             }
 
+            @Override
+            Object decode(BytesRef r) {
+                throw new UnsupportedOperationException();
+            }
+
             @Override
             void decodeAndWrite(XContentBuilder b, BytesRef r) {
                 // NOOP
@@ -591,6 +681,8 @@ public final class XContentDataHelper {
 
         abstract byte[] encode(XContentParser parser) throws IOException;
 
+        abstract Object decode(BytesRef r);
+
         abstract void decodeAndWrite(XContentBuilder b, BytesRef r) throws IOException;
 
         static byte[] encode(BigInteger n, Byte encoding) throws IOException {

+ 17 - 11
server/src/test/java/org/elasticsearch/index/mapper/blockloader/KeywordFieldBlockLoaderTests.java

@@ -13,7 +13,6 @@ import org.apache.lucene.util.BytesRef;
 import org.elasticsearch.index.mapper.BlockLoaderTestCase;
 import org.elasticsearch.logsdb.datageneration.FieldType;
 
-import java.util.HashSet;
 import java.util.List;
 import java.util.Map;
 import java.util.Objects;
@@ -28,27 +27,30 @@ public class KeywordFieldBlockLoaderTests extends BlockLoaderTestCase {
     @SuppressWarnings("unchecked")
     @Override
     protected Object expected(Map<String, Object> fieldMapping, Object value, boolean syntheticSource) {
-        if (value == null) {
-            return null;
-        }
+        var nullValue = (String) fieldMapping.get("null_value");
 
         var ignoreAbove = fieldMapping.get("ignore_above") == null
             ? Integer.MAX_VALUE
             : ((Number) fieldMapping.get("ignore_above")).intValue();
 
+        if (value == null) {
+            return convert(null, nullValue, ignoreAbove);
+        }
+
         if (value instanceof String s) {
-            return convert(s, ignoreAbove);
+            return convert(s, nullValue, ignoreAbove);
         }
 
-        Function<Stream<String>, Stream<BytesRef>> convertValues = s -> s.map(v -> convert(v, ignoreAbove)).filter(Objects::nonNull);
+        Function<Stream<String>, Stream<BytesRef>> convertValues = s -> s.map(v -> convert(v, nullValue, ignoreAbove))
+            .filter(Objects::nonNull);
 
         if ((boolean) fieldMapping.getOrDefault("doc_values", false)) {
             // Sorted and no duplicates
 
-            var values = new HashSet<>((List<String>) value);
-            var resultList = convertValues.compose(s -> values.stream().filter(Objects::nonNull).sorted())
+            var resultList = convertValues.andThen(Stream::distinct)
+                .andThen(Stream::sorted)
                 .andThen(Stream::toList)
-                .apply(values.stream());
+                .apply(((List<String>) value).stream());
             return maybeFoldList(resultList);
         }
 
@@ -69,9 +71,13 @@ public class KeywordFieldBlockLoaderTests extends BlockLoaderTestCase {
         return list;
     }
 
-    private BytesRef convert(String value, int ignoreAbove) {
+    private BytesRef convert(String value, String nullValue, int ignoreAbove) {
         if (value == null) {
-            return null;
+            if (nullValue != null) {
+                value = nullValue;
+            } else {
+                return null;
+            }
         }
 
         return value.length() <= ignoreAbove ? new BytesRef(value) : null;

+ 99 - 16
test/framework/src/main/java/org/elasticsearch/index/mapper/BlockLoaderTestCase.java

@@ -13,82 +13,159 @@ import org.apache.lucene.index.DirectoryReader;
 import org.apache.lucene.index.LeafReaderContext;
 import org.apache.lucene.store.Directory;
 import org.apache.lucene.tests.index.RandomIndexWriter;
+import org.elasticsearch.common.bytes.BytesReference;
 import org.elasticsearch.index.IndexSettings;
 import org.elasticsearch.index.fieldvisitor.StoredFieldLoader;
 import org.elasticsearch.logsdb.datageneration.DataGeneratorSpecification;
-import org.elasticsearch.logsdb.datageneration.FieldDataGenerator;
+import org.elasticsearch.logsdb.datageneration.DocumentGenerator;
 import org.elasticsearch.logsdb.datageneration.FieldType;
 import org.elasticsearch.logsdb.datageneration.MappingGenerator;
 import org.elasticsearch.logsdb.datageneration.Template;
 import org.elasticsearch.logsdb.datageneration.datasource.DataSourceHandler;
 import org.elasticsearch.logsdb.datageneration.datasource.DataSourceRequest;
 import org.elasticsearch.logsdb.datageneration.datasource.DataSourceResponse;
+import org.elasticsearch.plugins.internal.XContentMeteringParserDecorator;
 import org.elasticsearch.search.fetch.StoredFieldsSpec;
 import org.elasticsearch.search.lookup.SearchLookup;
 import org.elasticsearch.xcontent.XContentBuilder;
 import org.elasticsearch.xcontent.XContentType;
 
 import java.io.IOException;
+import java.util.ArrayList;
+import java.util.HashMap;
 import java.util.List;
 import java.util.Map;
 import java.util.Set;
+import java.util.stream.Stream;
 
 public abstract class BlockLoaderTestCase extends MapperServiceTestCase {
+    private final FieldType fieldType;
     private final String fieldName;
-    private final Template template;
     private final MappingGenerator mappingGenerator;
-    private final FieldDataGenerator generator;
+    private final DocumentGenerator documentGenerator;
 
     protected BlockLoaderTestCase(FieldType fieldType) {
+        this.fieldType = fieldType;
         this.fieldName = randomAlphaOfLengthBetween(5, 10);
 
-        // Disable all dynamic mapping
         var specification = DataGeneratorSpecification.builder()
             .withFullyDynamicMapping(false)
+            // Disable dynamic mapping and disabled objects
             .withDataSourceHandlers(List.of(new DataSourceHandler() {
                 @Override
                 public DataSourceResponse.DynamicMappingGenerator handle(DataSourceRequest.DynamicMappingGenerator request) {
                     return new DataSourceResponse.DynamicMappingGenerator(isObject -> false);
                 }
+
+                @Override
+                public DataSourceResponse.ObjectMappingParametersGenerator handle(
+                    DataSourceRequest.ObjectMappingParametersGenerator request
+                ) {
+                    return new DataSourceResponse.ObjectMappingParametersGenerator(HashMap::new); // just defaults
+                }
             }))
             .build();
 
-        this.template = new Template(Map.of(fieldName, new Template.Leaf(fieldName, fieldType)));
         this.mappingGenerator = new MappingGenerator(specification);
-        this.generator = fieldType.generator(fieldName, specification.dataSource());
+        this.documentGenerator = new DocumentGenerator(specification);
     }
 
     public void testBlockLoader() throws IOException {
+        var template = new Template(Map.of(fieldName, new Template.Leaf(fieldName, fieldType)));
+        runTest(template, fieldName);
+    }
+
+    public void testBlockLoaderForFieldInObject() throws IOException {
+        int depth = randomIntBetween(0, 3);
+
+        Map<String, Template.Entry> currentLevel = new HashMap<>();
+        Map<String, Template.Entry> top = Map.of("top", new Template.Object("top", false, currentLevel));
+
+        var fullFieldName = new StringBuilder("top");
+        int currentDepth = 0;
+        while (currentDepth++ < depth) {
+            fullFieldName.append('.').append("level").append(currentDepth);
+
+            Map<String, Template.Entry> nextLevel = new HashMap<>();
+            currentLevel.put("level" + currentDepth, new Template.Object("level" + currentDepth, false, nextLevel));
+            currentLevel = nextLevel;
+        }
+
+        fullFieldName.append('.').append(fieldName);
+        currentLevel.put(fieldName, new Template.Leaf(fieldName, fieldType));
+        var template = new Template(top);
+        runTest(template, fullFieldName.toString());
+    }
+
+    private void runTest(Template template, String fieldName) throws IOException {
         var mapping = mappingGenerator.generate(template);
         var mappingXContent = XContentBuilder.builder(XContentType.JSON.xContent()).map(mapping.raw());
 
         var syntheticSource = randomBoolean();
         var mapperService = syntheticSource ? createSytheticSourceMapperService(mappingXContent) : createMapperService(mappingXContent);
 
-        var fieldValue = generator.generateValue();
+        var document = documentGenerator.generate(template, mapping);
+        var documentXContent = XContentBuilder.builder(XContentType.JSON.xContent()).map(document);
 
-        Object blockLoaderResult = setupAndInvokeBlockLoader(mapperService, fieldValue);
-        Object expected = expected(mapping.lookup().get(fieldName), fieldValue, syntheticSource);
+        Object blockLoaderResult = setupAndInvokeBlockLoader(mapperService, documentXContent, fieldName);
+        Object expected = expected(mapping.lookup().get(fieldName), getFieldValue(document, fieldName), syntheticSource);
         assertEquals(expected, blockLoaderResult);
     }
 
     protected abstract Object expected(Map<String, Object> fieldMapping, Object value, boolean syntheticSource);
 
-    private Object setupAndInvokeBlockLoader(MapperService mapperService, Object fieldValue) throws IOException {
+    private Object getFieldValue(Map<String, Object> document, String fieldName) {
+        var rawValues = new ArrayList<>();
+        processLevel(document, fieldName, rawValues);
+
+        if (rawValues.size() == 1) {
+            return rawValues.get(0);
+        }
+
+        return rawValues.stream().flatMap(v -> v instanceof List<?> l ? l.stream() : Stream.of(v)).toList();
+    }
+
+    @SuppressWarnings("unchecked")
+    private void processLevel(Map<String, Object> level, String field, ArrayList<Object> values) {
+        if (field.contains(".") == false) {
+            var value = level.get(field);
+            values.add(value);
+            return;
+        }
+
+        var nameInLevel = field.split("\\.")[0];
+        var entry = level.get(nameInLevel);
+        if (entry instanceof Map<?, ?> m) {
+            processLevel((Map<String, Object>) m, field.substring(field.indexOf('.') + 1), values);
+        }
+        if (entry instanceof List<?> l) {
+            for (var object : l) {
+                processLevel((Map<String, Object>) object, field.substring(field.indexOf('.') + 1), values);
+            }
+        }
+    }
+
+    private Object setupAndInvokeBlockLoader(MapperService mapperService, XContentBuilder document, String fieldName) throws IOException {
         try (Directory directory = newDirectory()) {
             RandomIndexWriter iw = new RandomIndexWriter(random(), directory);
 
-            LuceneDocument doc = mapperService.documentMapper().parse(source(b -> {
-                b.field(fieldName);
-                b.value(fieldValue);
-            })).rootDoc();
+            var source = new SourceToParse(
+                "1",
+                BytesReference.bytes(document),
+                XContentType.JSON,
+                null,
+                Map.of(),
+                true,
+                XContentMeteringParserDecorator.NOOP
+            );
+            LuceneDocument doc = mapperService.documentMapper().parse(source).rootDoc();
 
             iw.addDocument(doc);
             iw.close();
 
             try (DirectoryReader reader = DirectoryReader.open(directory)) {
                 LeafReaderContext context = reader.leaves().get(0);
-                return load(createBlockLoader(mapperService), context, mapperService);
+                return load(createBlockLoader(mapperService, fieldName), context, mapperService);
             }
         }
     }
@@ -98,6 +175,9 @@ public abstract class BlockLoaderTestCase extends MapperServiceTestCase {
         var columnAtATimeReader = blockLoader.columnAtATimeReader(context);
         if (columnAtATimeReader != null) {
             var block = (TestBlock) columnAtATimeReader.read(TestBlock.factory(context.reader().numDocs()), TestBlock.docs(0));
+            if (block.size() == 0) {
+                return null;
+            }
             return block.get(0);
         }
 
@@ -119,10 +199,13 @@ public abstract class BlockLoaderTestCase extends MapperServiceTestCase {
         BlockLoader.Builder builder = blockLoader.builder(TestBlock.factory(context.reader().numDocs()), 1);
         blockLoader.rowStrideReader(context).read(0, storedFieldsLoader, builder);
         var block = (TestBlock) builder.build();
+        if (block.size() == 0) {
+            return null;
+        }
         return block.get(0);
     }
 
-    private BlockLoader createBlockLoader(MapperService mapperService) {
+    private BlockLoader createBlockLoader(MapperService mapperService, String fieldName) {
         SearchLookup searchLookup = new SearchLookup(mapperService.mappingLookup().fieldTypesLookup()::get, null, null);
 
         return mapperService.fieldType(fieldName).blockLoader(new MappedFieldType.BlockLoaderContext() {

+ 3 - 0
test/framework/src/main/java/org/elasticsearch/logsdb/datageneration/datasource/DefaultMappingParametersHandler.java

@@ -64,6 +64,9 @@ public class DefaultMappingParametersHandler implements DataSourceHandler {
             if (ESTestCase.randomDouble() <= 0.2) {
                 injected.put("ignore_above", ESTestCase.randomIntBetween(1, 100));
             }
+            if (ESTestCase.randomDouble() <= 0.2) {
+                injected.put("null_value", ESTestCase.randomAlphaOfLengthBetween(0, 10));
+            }
 
             return injected;
         };

+ 59 - 0
test/framework/src/main/java/org/elasticsearch/logsdb/datageneration/matchers/source/FieldSpecificMatcher.java

@@ -288,4 +288,63 @@ interface FieldSpecificMatcher {
                 );
         }
     }
+
+    class KeywordMatcher implements FieldSpecificMatcher {
+        private final XContentBuilder actualMappings;
+        private final Settings.Builder actualSettings;
+        private final XContentBuilder expectedMappings;
+        private final Settings.Builder expectedSettings;
+
+        KeywordMatcher(
+            XContentBuilder actualMappings,
+            Settings.Builder actualSettings,
+            XContentBuilder expectedMappings,
+            Settings.Builder expectedSettings
+        ) {
+            this.actualMappings = actualMappings;
+            this.actualSettings = actualSettings;
+            this.expectedMappings = expectedMappings;
+            this.expectedSettings = expectedSettings;
+        }
+
+        @Override
+        public MatchResult match(
+            List<Object> actual,
+            List<Object> expected,
+            Map<String, Object> actualMapping,
+            Map<String, Object> expectedMapping
+        ) {
+            var nullValue = actualMapping.get("null_value");
+            var expectedNullValue = expectedMapping.get("null_value");
+            if (Objects.equals(nullValue, expectedNullValue) == false) {
+                throw new IllegalStateException(
+                    "[null_value] parameter for [keyword] field does not match between actual and expected mapping"
+                );
+            }
+
+            var expectedNormalized = normalize(expected, (String) nullValue);
+            var actualNormalized = normalize(actual, (String) nullValue);
+
+            return actualNormalized.equals(expectedNormalized)
+                ? MatchResult.match()
+                : MatchResult.noMatch(
+                    formatErrorMessage(
+                        actualMappings,
+                        actualSettings,
+                        expectedMappings,
+                        expectedSettings,
+                        "Values of type [keyword] don't match after normalization, normalized "
+                            + prettyPrintCollections(actualNormalized, expectedNormalized)
+                    )
+                );
+        }
+
+        private static Set<String> normalize(List<Object> values, String nullValue) {
+            if (values == null) {
+                return Set.of();
+            }
+
+            return values.stream().map(v -> v == null ? nullValue : (String) v).filter(Objects::nonNull).collect(Collectors.toSet());
+        }
+    }
 }

+ 3 - 1
test/framework/src/main/java/org/elasticsearch/logsdb/datageneration/matchers/source/SourceMatcher.java

@@ -59,7 +59,9 @@ public class SourceMatcher extends GenericEqualsMatcher<List<Map<String, Object>
             "unsigned_long",
             new FieldSpecificMatcher.UnsignedLongMatcher(actualMappings, actualSettings, expectedMappings, expectedSettings),
             "counted_keyword",
-            new FieldSpecificMatcher.CountedKeywordMatcher(actualMappings, actualSettings, expectedMappings, expectedSettings)
+            new FieldSpecificMatcher.CountedKeywordMatcher(actualMappings, actualSettings, expectedMappings, expectedSettings),
+            "keyword",
+            new FieldSpecificMatcher.KeywordMatcher(actualMappings, actualSettings, expectedMappings, expectedSettings)
         );
         this.dynamicFieldMatcher = new DynamicFieldMatcher(actualMappings, actualSettings, expectedMappings, expectedSettings);
     }