Browse Source

Ignore conflicting fields during dynamic mapping update (#114227) (#116194)

This fixes a bug when concurrently executing index requests that have different types for the same field.

(cherry picked from commit 9658940a518099f3e7f1b9e8d0f802a4be788094)

Co-authored-by: Elastic Machine <elasticmachine@users.noreply.github.com>
Felix Barnsteiner 11 months ago
parent
commit
633e7d831e

+ 6 - 0
docs/changelog/114227.yaml

@@ -0,0 +1,6 @@
+pr: 114227
+summary: Ignore conflicting fields during dynamic mapping update
+area: Mapping
+type: bug
+issues:
+ - 114228

+ 31 - 0
server/src/internalClusterTest/java/org/elasticsearch/index/mapper/DynamicMappingIT.java

@@ -63,6 +63,8 @@ import static org.hamcrest.Matchers.containsString;
 import static org.hamcrest.Matchers.equalTo;
 import static org.hamcrest.Matchers.hasKey;
 import static org.hamcrest.Matchers.instanceOf;
+import static org.hamcrest.Matchers.is;
+import static org.hamcrest.Matchers.oneOf;
 
 public class DynamicMappingIT extends ESIntegTestCase {
 
@@ -190,6 +192,35 @@ public class DynamicMappingIT extends ESIntegTestCase {
         return properties;
     }
 
+    public void testConcurrentDynamicMappingsWithConflictingType() throws Throwable {
+        int numberOfDocsToCreate = 16;
+        indicesAdmin().prepareCreate("index").setSettings(Settings.builder()).get();
+        ensureGreen("index");
+        final AtomicReference<Throwable> error = new AtomicReference<>();
+        startInParallel(numberOfDocsToCreate, i -> {
+            try {
+                assertEquals(
+                    DocWriteResponse.Result.CREATED,
+                    prepareIndex("index").setId(Integer.toString(i)).setSource("field" + i, 0, "field" + (i + 1), 0.1).get().getResult()
+                );
+            } catch (Exception e) {
+                error.compareAndSet(null, e);
+            }
+        });
+        if (error.get() != null) {
+            throw error.get();
+        }
+        client().admin().indices().prepareRefresh("index").get();
+        for (int i = 0; i < numberOfDocsToCreate; ++i) {
+            assertTrue(client().prepareGet("index", Integer.toString(i)).get().isExists());
+        }
+        Map<String, Object> index = indicesAdmin().prepareGetMappings("index").get().getMappings().get("index").getSourceAsMap();
+        for (int i = 0, j = 1; i < numberOfDocsToCreate; i++, j++) {
+            assertThat(new WriteField("properties.field" + i + ".type", () -> index).get(null), is(oneOf("long", "float")));
+            assertThat(new WriteField("properties.field" + j + ".type", () -> index).get(null), is(oneOf("long", "float")));
+        }
+    }
+
     public void testPreflightCheckAvoidsMaster() throws InterruptedException, IOException {
         // can't use INDEX_MAPPING_TOTAL_FIELDS_LIMIT_SETTING nor INDEX_MAPPING_DEPTH_LIMIT_SETTING as a check here, as that is already
         // checked at parse time, see testTotalFieldsLimitForDynamicMappingsUpdateCheckedAtDocumentParseTime

+ 26 - 0
server/src/main/java/org/elasticsearch/index/mapper/ObjectMapper.java

@@ -9,6 +9,8 @@
 
 package org.elasticsearch.index.mapper;
 
+import org.apache.logging.log4j.LogManager;
+import org.apache.logging.log4j.Logger;
 import org.apache.lucene.index.LeafReader;
 import org.apache.lucene.util.BytesRef;
 import org.elasticsearch.ElasticsearchParseException;
@@ -41,6 +43,7 @@ import java.util.TreeMap;
 import java.util.stream.Stream;
 
 public class ObjectMapper extends Mapper {
+    private static final Logger logger = LogManager.getLogger(ObjectMapper.class);
     private static final DeprecationLogger deprecationLogger = DeprecationLogger.getLogger(ObjectMapper.class);
     public static final FeatureFlag SUB_OBJECTS_AUTO_FEATURE_FLAG = new FeatureFlag("sub_objects_auto");
 
@@ -679,6 +682,13 @@ public class ObjectMapper extends Mapper {
                     // replaces an existing one.
                     if (objectMergeContext.getMapperBuilderContext().getMergeReason() == MergeReason.INDEX_TEMPLATE) {
                         putMergedMapper(mergedMappers, mergeWithMapper);
+                    } else if (isConflictingDynamicMapping(objectMergeContext, mergeWithMapper, mergeIntoMapper)) {
+                        logger.trace(
+                            "ignoring conflicting dynamic mapping update for field={} current_type={} new_type={}",
+                            mergeIntoMapper.fullPath(),
+                            mergeIntoMapper.typeName(),
+                            mergeWithMapper.typeName()
+                        );
                     } else {
                         putMergedMapper(mergedMappers, mergeIntoMapper.merge(mergeWithMapper, objectMergeContext));
                     }
@@ -687,6 +697,22 @@ public class ObjectMapper extends Mapper {
             return Map.copyOf(mergedMappers);
         }
 
+        /*
+         * We're ignoring the field if a dynamic mapping update tries to define a conflicting field type.
+         * This is caused by another index request with a different value racing to update the mappings.
+         * After updating the mappings, the index request will be re-tried and sees the updated mappings for this field.
+         * The updated mappings will then be taken into account when parsing the document
+         * (for example by coercing the value, ignore_malformed values, or failing the index request due to a type conflict).
+         */
+        private static boolean isConflictingDynamicMapping(
+            MapperMergeContext objectMergeContext,
+            Mapper mergeWithMapper,
+            Mapper mergeIntoMapper
+        ) {
+            return objectMergeContext.getMapperBuilderContext().getMergeReason().isAutoUpdate()
+                && mergeIntoMapper.typeName().equals(mergeWithMapper.typeName()) == false;
+        }
+
         private static void putMergedMapper(Map<String, Mapper> mergedMappers, @Nullable Mapper merged) {
             if (merged != null) {
                 mergedMappers.put(merged.leafName(), merged);

+ 34 - 0
server/src/test/java/org/elasticsearch/index/mapper/ObjectMapperMergeTests.java

@@ -9,13 +9,19 @@
 package org.elasticsearch.index.mapper;
 
 import org.elasticsearch.index.IndexVersion;
+import org.elasticsearch.script.ScriptCompiler;
 import org.elasticsearch.test.ESTestCase;
 
 import java.util.Collections;
 import java.util.Optional;
 
 import static org.elasticsearch.index.mapper.MapperService.MergeReason.INDEX_TEMPLATE;
+import static org.elasticsearch.index.mapper.MapperService.MergeReason.MAPPING_AUTO_UPDATE;
+import static org.elasticsearch.index.mapper.MapperService.MergeReason.MAPPING_AUTO_UPDATE_PREFLIGHT;
 import static org.elasticsearch.index.mapper.MapperService.MergeReason.MAPPING_UPDATE;
+import static org.hamcrest.Matchers.equalTo;
+import static org.hamcrest.Matchers.instanceOf;
+import static org.hamcrest.Matchers.is;
 
 public final class ObjectMapperMergeTests extends ESTestCase {
 
@@ -318,6 +324,34 @@ public final class ObjectMapperMergeTests extends ESTestCase {
         assertNotNull(parentMapper.getMapper("child.grandchild"));
     }
 
+    public void testConflictingDynamicUpdate() {
+        RootObjectMapper mergeInto = new RootObjectMapper.Builder("_doc", Optional.empty()).add(
+            new KeywordFieldMapper.Builder("http.status_code", IndexVersion.current())
+        ).build(MapperBuilderContext.root(false, false));
+        RootObjectMapper mergeWith = new RootObjectMapper.Builder("_doc", Optional.empty()).add(
+            new NumberFieldMapper.Builder(
+                "http.status_code",
+                NumberFieldMapper.NumberType.LONG,
+                ScriptCompiler.NONE,
+                false,
+                true,
+                IndexVersion.current(),
+                null
+            )
+        ).build(MapperBuilderContext.root(false, false));
+
+        MapperService.MergeReason autoUpdateMergeReason = randomFrom(MAPPING_AUTO_UPDATE, MAPPING_AUTO_UPDATE_PREFLIGHT);
+        ObjectMapper merged = mergeInto.merge(mergeWith, MapperMergeContext.root(false, false, autoUpdateMergeReason, Long.MAX_VALUE));
+        FieldMapper httpStatusCode = (FieldMapper) merged.getMapper("http.status_code");
+        assertThat(httpStatusCode, is(instanceOf(KeywordFieldMapper.class)));
+
+        IllegalArgumentException e = expectThrows(
+            IllegalArgumentException.class,
+            () -> mergeInto.merge(mergeWith, MapperMergeContext.root(false, false, MAPPING_UPDATE, Long.MAX_VALUE))
+        );
+        assertThat(e.getMessage(), equalTo("mapper [http.status_code] cannot be changed from type [keyword] to [long]"));
+    }
+
     private static RootObjectMapper createRootSubobjectFalseLeafWithDots() {
         FieldMapper.Builder fieldBuilder = new KeywordFieldMapper.Builder("host.name", IndexVersion.current());
         FieldMapper fieldMapper = fieldBuilder.build(MapperBuilderContext.root(false, false));