Browse Source

Add mappings for enrich fields (#96056)

We are developing a new component that performs lookup during query 
time. The idea is to utilize the existing enrich policies and indices 
used during indexing. However, to ensure proper functionality of the new
component, we need the mapping types and doc_values of the enrich
fields.

With this change, populating the mapping for enrich fields is a 
best-effort process to maintain the current behavior. This means that
any mapping conflicts arising between the enrich fields of the source
indices will be ignored. I will follow up to address this validation
issue.
Nhat Nguyen 2 years ago
parent
commit
00afc5b5a2

+ 5 - 0
docs/changelog/96056.yaml

@@ -0,0 +1,5 @@
+pr: 96056
+summary: Add mappings for enrich fields
+area: Ingest Node
+type: enhancement
+issues: []

+ 120 - 93
x-pack/plugin/enrich/src/main/java/org/elasticsearch/xpack/enrich/EnrichPolicyRunner.java

@@ -39,7 +39,8 @@ import org.elasticsearch.cluster.service.ClusterService;
 import org.elasticsearch.common.Strings;
 import org.elasticsearch.common.bytes.BytesArray;
 import org.elasticsearch.common.settings.Settings;
-import org.elasticsearch.core.CheckedFunction;
+import org.elasticsearch.common.util.Maps;
+import org.elasticsearch.common.util.iterable.Iterables;
 import org.elasticsearch.index.mapper.MapperService;
 import org.elasticsearch.index.query.QueryBuilders;
 import org.elasticsearch.index.reindex.BulkByScrollResponse;
@@ -47,7 +48,6 @@ import org.elasticsearch.index.reindex.ReindexRequest;
 import org.elasticsearch.index.reindex.ScrollableHitSource;
 import org.elasticsearch.search.builder.SearchSourceBuilder;
 import org.elasticsearch.tasks.TaskCancelledException;
-import org.elasticsearch.xcontent.ObjectPath;
 import org.elasticsearch.xcontent.XContentBuilder;
 import org.elasticsearch.xcontent.XContentType;
 import org.elasticsearch.xcontent.json.JsonXContent;
@@ -57,6 +57,7 @@ import org.elasticsearch.xpack.enrich.action.EnrichReindexAction;
 
 import java.io.IOException;
 import java.io.UncheckedIOException;
+import java.util.HashMap;
 import java.util.HashSet;
 import java.util.List;
 import java.util.Map;
@@ -180,9 +181,9 @@ public class EnrichPolicyRunner implements Runnable {
         }
         // Validate the key and values
         try {
-            validateField(mapping, policy.getMatchField(), true);
+            validateAndGetMappingTypeAndFormat(mapping, policy.getMatchField(), true);
             for (String valueFieldName : policy.getEnrichFields()) {
-                validateField(mapping, valueFieldName, false);
+                validateAndGetMappingTypeAndFormat(mapping, valueFieldName, false);
             }
         } catch (ElasticsearchException e) {
             throw new ElasticsearchException(
@@ -194,11 +195,64 @@ public class EnrichPolicyRunner implements Runnable {
         }
     }
 
-    private static void validateField(Map<?, ?> properties, String fieldName, boolean fieldRequired) {
+    private record MappingTypeAndFormat(String type, String format) {
+
+    }
+
+    private static MappingTypeAndFormat validateAndGetMappingTypeAndFormat(
+        String fieldName,
+        EnrichPolicy policy,
+        boolean strictlyRequired,
+        List<Map<String, Object>> sourceMappings
+    ) {
+        var fieldMappings = sourceMappings.stream()
+            .map(mapping -> validateAndGetMappingTypeAndFormat(mapping, fieldName, strictlyRequired))
+            .filter(Objects::nonNull)
+            .toList();
+        Set<String> types = fieldMappings.stream().map(tf -> tf.type).collect(Collectors.toSet());
+        if (types.size() > 1) {
+            if (strictlyRequired) {
+                throw new ElasticsearchException(
+                    "Multiple distinct mapping types for field '{}' - indices({})  types({})",
+                    fieldName,
+                    Strings.collectionToCommaDelimitedString(policy.getIndices()),
+                    Strings.collectionToCommaDelimitedString(types)
+                );
+            }
+            return null;
+        }
+        if (types.isEmpty()) {
+            return null;
+        }
+        Set<String> formats = fieldMappings.stream().map(tf -> tf.format).filter(Objects::nonNull).collect(Collectors.toSet());
+        if (formats.size() > 1) {
+            if (strictlyRequired) {
+                throw new ElasticsearchException(
+                    "Multiple distinct formats specified for field '{}' - indices({})  format entries({})",
+                    policy.getMatchField(),
+                    Strings.collectionToCommaDelimitedString(policy.getIndices()),
+                    Strings.collectionToCommaDelimitedString(formats)
+                );
+            }
+            return null;
+        }
+        return new MappingTypeAndFormat(Iterables.get(types, 0), formats.isEmpty() ? null : Iterables.get(formats, 0));
+    }
+
+    @SuppressWarnings("unchecked")
+    private static <T> T extractValues(Map<String, Object> properties, String path) {
+        return (T) properties.get(path);
+    }
+
+    private static MappingTypeAndFormat validateAndGetMappingTypeAndFormat(
+        Map<String, Object> properties,
+        String fieldName,
+        boolean fieldRequired
+    ) {
         assert Strings.isEmpty(fieldName) == false : "Field name cannot be null or empty";
         String[] fieldParts = fieldName.split("\\.");
         StringBuilder parent = new StringBuilder();
-        Map<?, ?> currentField = properties;
+        Map<String, Object> currentField = properties;
         boolean onRoot = true;
         for (String fieldPart : fieldParts) {
             // Ensure that the current field is of object type only (not a nested type or a non compound field)
@@ -211,7 +265,7 @@ public class EnrichPolicyRunner implements Runnable {
                     type
                 );
             }
-            Map<?, ?> currentProperties = ((Map<?, ?>) currentField.get("properties"));
+            Map<String, Object> currentProperties = extractValues(currentField, "properties");
             if (currentProperties == null) {
                 if (fieldRequired) {
                     throw new ElasticsearchException(
@@ -220,10 +274,10 @@ public class EnrichPolicyRunner implements Runnable {
                         onRoot ? "root" : parent.toString()
                     );
                 } else {
-                    return;
+                    return null;
                 }
             }
-            currentField = ((Map<?, ?>) currentProperties.get(fieldPart));
+            currentField = extractValues(currentProperties, fieldPart);
             if (currentField == null) {
                 if (fieldRequired) {
                     throw new ElasticsearchException(
@@ -233,7 +287,7 @@ public class EnrichPolicyRunner implements Runnable {
                         onRoot ? "root" : parent.toString()
                     );
                 } else {
-                    return;
+                    return null;
                 }
             }
             if (onRoot) {
@@ -243,95 +297,70 @@ public class EnrichPolicyRunner implements Runnable {
             }
             parent.append(fieldPart);
         }
-    }
-
-    private XContentBuilder resolveEnrichMapping(final EnrichPolicy enrichPolicy, final List<Map<String, Object>> mappings) {
-        if (EnrichPolicy.MATCH_TYPE.equals(enrichPolicy.getType())) {
-            return createEnrichMappingBuilder((builder) -> builder.field("type", "keyword").field("doc_values", false));
-        } else if (EnrichPolicy.RANGE_TYPE.equals(enrichPolicy.getType())) {
-            return createRangeEnrichMappingBuilder(enrichPolicy, mappings);
-        } else if (EnrichPolicy.GEO_MATCH_TYPE.equals(enrichPolicy.getType())) {
-            return createEnrichMappingBuilder((builder) -> builder.field("type", "geo_shape"));
-        } else {
-            throw new ElasticsearchException("Unrecognized enrich policy type [{}]", enrichPolicy.getType());
+        if (currentField == null) {
+            return null;
         }
+        final String type = (String) currentField.getOrDefault("type", "object");
+        final String format = (String) currentField.get("format");
+        return new MappingTypeAndFormat(type, format);
     }
 
-    private XContentBuilder createRangeEnrichMappingBuilder(EnrichPolicy enrichPolicy, List<Map<String, Object>> mappings) {
-        String matchFieldPath = "properties." + enrichPolicy.getMatchField().replace(".", ".properties.");
-        List<Map<String, String>> matchFieldMappings = mappings.stream()
-            .map(map -> ObjectPath.<Map<String, String>>eval(matchFieldPath, map))
-            .filter(Objects::nonNull)
-            .toList();
-
-        Set<String> types = matchFieldMappings.stream().map(map -> map.get("type")).collect(Collectors.toSet());
-        if (types.size() == 1) {
-            String type = types.iterator().next();
-            if (type == null) {
-                // when no type is defined in a field mapping then it is of type object:
-                throw new ElasticsearchException(
-                    "Field '{}' has type [object] which doesn't appear to be a range type",
-                    enrichPolicy.getMatchField(),
-                    type
-                );
-            }
+    static final Set<String> RANGE_TYPES = Set.of("integer_range", "float_range", "long_range", "double_range", "ip_range", "date_range");
 
-            switch (type) {
-                case "integer_range":
-                case "float_range":
-                case "long_range":
-                case "double_range":
-                case "ip_range":
-                    return createEnrichMappingBuilder((builder) -> builder.field("type", type).field("doc_values", false));
-
-                // date_range types mappings allow for the format to be specified, should be preserved in the created index
-                case "date_range":
-                    Set<String> formatEntries = matchFieldMappings.stream().map(map -> map.get("format")).collect(Collectors.toSet());
-                    if (formatEntries.size() == 1) {
-                        return createEnrichMappingBuilder((builder) -> {
-                            builder.field("type", type).field("doc_values", false);
-                            String format = formatEntries.iterator().next();
-                            if (format != null) {
-                                builder.field("format", format);
-                            }
-                            return builder;
-                        });
-                    }
-                    if (formatEntries.isEmpty()) {
-                        // no format specify rely on default
-                        return createEnrichMappingBuilder((builder) -> builder.field("type", type).field("doc_values", false));
-                    }
-                    throw new ElasticsearchException(
-                        "Multiple distinct date format specified for match field '{}' - indices({})  format entries({})",
-                        enrichPolicy.getMatchField(),
-                        Strings.collectionToCommaDelimitedString(enrichPolicy.getIndices()),
-                        (formatEntries.contains(null) ? "(DEFAULT), " : "") + Strings.collectionToCommaDelimitedString(formatEntries)
-                    );
-
-                default:
+    static Map<String, Object> mappingForMatchField(EnrichPolicy policy, List<Map<String, Object>> sourceMappings) {
+        MappingTypeAndFormat typeAndFormat = validateAndGetMappingTypeAndFormat(policy.getMatchField(), policy, true, sourceMappings);
+        if (typeAndFormat == null) {
+            throw new ElasticsearchException(
+                "Match field '{}' doesn't have a correct mapping type for policy type '{}'",
+                policy.getMatchField(),
+                policy.getType()
+            );
+        }
+        return switch (policy.getType()) {
+            case EnrichPolicy.MATCH_TYPE -> Map.of("type", "keyword", "doc_values", false);
+            case EnrichPolicy.GEO_MATCH_TYPE -> Map.of("type", "geo_shape");
+            case EnrichPolicy.RANGE_TYPE -> {
+                if (RANGE_TYPES.contains(typeAndFormat.type) == false) {
                     throw new ElasticsearchException(
                         "Field '{}' has type [{}] which doesn't appear to be a range type",
-                        enrichPolicy.getMatchField(),
-                        type
+                        policy.getMatchField(),
+                        typeAndFormat.type
                     );
+                }
+                Map<String, Object> mapping = Maps.newMapWithExpectedSize(3);
+                mapping.put("type", typeAndFormat.type);
+                mapping.put("doc_values", false);
+                if (typeAndFormat.format != null) {
+                    mapping.put("format", typeAndFormat.format);
+                }
+                yield mapping;
             }
-        }
-        if (types.isEmpty()) {
-            throw new ElasticsearchException(
-                "No mapping type found for match field '{}' - indices({})",
-                enrichPolicy.getMatchField(),
-                Strings.collectionToCommaDelimitedString(enrichPolicy.getIndices())
-            );
-        }
-        throw new ElasticsearchException(
-            "Multiple distinct mapping types for match field '{}' - indices({})  types({})",
-            enrichPolicy.getMatchField(),
-            Strings.collectionToCommaDelimitedString(enrichPolicy.getIndices()),
-            Strings.collectionToCommaDelimitedString(types)
-        );
+            default -> throw new ElasticsearchException("Unrecognized enrich policy type [{}]", policy.getType());
+        };
     }
 
-    private XContentBuilder createEnrichMappingBuilder(CheckedFunction<XContentBuilder, XContentBuilder, IOException> matchFieldMapping) {
+    private XContentBuilder createEnrichMapping(List<Map<String, Object>> sourceMappings) {
+        Map<String, Map<String, Object>> fieldMappings = new HashMap<>();
+        Map<String, Object> mappingForMatchField = mappingForMatchField(policy, sourceMappings);
+        for (String enrichField : policy.getEnrichFields()) {
+            if (enrichField.equals(policy.getMatchField())) {
+                mappingForMatchField = new HashMap<>(mappingForMatchField);
+                mappingForMatchField.remove("doc_values"); // enable doc_values
+            } else {
+                var typeAndFormat = validateAndGetMappingTypeAndFormat(enrichField, policy, false, sourceMappings);
+                if (typeAndFormat != null) {
+                    Map<String, Object> mapping = Maps.newMapWithExpectedSize(3);
+                    mapping.put("type", typeAndFormat.type);
+                    if (typeAndFormat.format != null) {
+                        mapping.put("format", typeAndFormat.format);
+                    }
+                    mapping.put("index", false); // disable index
+                    fieldMappings.put(enrichField, mapping);
+                }
+            }
+        }
+        fieldMappings.put(policy.getMatchField(), mappingForMatchField);
+
         // Enable _source on enrich index. Explicitly mark key mapping type.
         try {
             XContentBuilder builder = JsonXContent.contentBuilder();
@@ -347,9 +376,7 @@ public class EnrichPolicyRunner implements Runnable {
                     builder.endObject();
                     builder.startObject("properties");
                     {
-                        builder.startObject(policy.getMatchField());
-                        matchFieldMapping.apply(builder);
-                        builder.endObject();
+                        builder.mapContents(fieldMappings);
                     }
                     builder.endObject();
                     builder.startObject("_meta");
@@ -380,7 +407,7 @@ public class EnrichPolicyRunner implements Runnable {
             .put("index.warmer.enabled", false)
             .build();
         CreateIndexRequest createEnrichIndexRequest = new CreateIndexRequest(enrichIndexName, enrichIndexSettings);
-        createEnrichIndexRequest.mapping(resolveEnrichMapping(policy, mappings));
+        createEnrichIndexRequest.mapping(createEnrichMapping(mappings));
         logger.debug("Policy [{}]: Creating new enrich index [{}]", policyName, enrichIndexName);
         enrichOriginClient().admin()
             .indices()

+ 377 - 193
x-pack/plugin/enrich/src/test/java/org/elasticsearch/xpack/enrich/EnrichPolicyRunnerTests.java

@@ -32,12 +32,14 @@ import org.elasticsearch.action.index.IndexRequest;
 import org.elasticsearch.action.index.IndexResponse;
 import org.elasticsearch.action.search.SearchRequest;
 import org.elasticsearch.action.search.SearchResponse;
+import org.elasticsearch.action.support.PlainActionFuture;
 import org.elasticsearch.action.support.WriteRequest;
 import org.elasticsearch.client.internal.Client;
 import org.elasticsearch.client.internal.FilterClient;
 import org.elasticsearch.cluster.metadata.IndexNameExpressionResolver;
 import org.elasticsearch.cluster.service.ClusterService;
 import org.elasticsearch.common.settings.Settings;
+import org.elasticsearch.common.xcontent.XContentHelper;
 import org.elasticsearch.core.Strings;
 import org.elasticsearch.index.IndexNotFoundException;
 import org.elasticsearch.index.engine.Segment;
@@ -53,6 +55,7 @@ import org.elasticsearch.tasks.TaskAwareRequest;
 import org.elasticsearch.tasks.TaskId;
 import org.elasticsearch.tasks.TaskManager;
 import org.elasticsearch.test.ESSingleNodeTestCase;
+import org.elasticsearch.test.hamcrest.ElasticsearchAssertions;
 import org.elasticsearch.threadpool.TestThreadPool;
 import org.elasticsearch.threadpool.ThreadPool;
 import org.elasticsearch.xcontent.XContentBuilder;
@@ -71,6 +74,7 @@ import java.util.Arrays;
 import java.util.Collection;
 import java.util.Collections;
 import java.util.List;
+import java.util.Locale;
 import java.util.Map;
 import java.util.concurrent.CountDownLatch;
 import java.util.concurrent.TimeUnit;
@@ -153,15 +157,22 @@ public class EnrichPolicyRunnerTests extends ESSingleNodeTestCase {
         // Validate Mapping
         Map<String, Object> mapping = enrichIndex.getMappings().get(createdEnrichIndex).sourceAsMap();
         validateMappingMetadata(mapping, policyName, policy);
-        assertThat(mapping.get("dynamic"), is("false"));
-        Map<?, ?> properties = (Map<?, ?>) mapping.get("properties");
-        assertNotNull(properties);
-        assertThat(properties.size(), is(equalTo(1)));
-        Map<?, ?> field1 = (Map<?, ?>) properties.get("field1");
-        assertNotNull(field1);
-        assertThat(field1.get("type"), is(equalTo("keyword")));
-        assertThat(field1.get("doc_values"), is(false));
-
+        assertEnrichMapping(mapping, """
+            {
+              "field1": {
+                "type": "keyword",
+                "doc_values": false
+              },
+              "field2": {
+                "type": "long",
+                "index": false
+              },
+              "field5": {
+                "type": "text",
+                "index": false
+              }
+            }
+            """);
         // Validate document structure
         SearchResponse enrichSearchResponse = client().search(
             new SearchRequest(".enrich-test1").source(SearchSourceBuilder.searchSource().query(QueryBuilders.matchAllQuery()))
@@ -222,15 +233,17 @@ public class EnrichPolicyRunnerTests extends ESSingleNodeTestCase {
         // Validate Mapping
         Map<String, Object> mapping = enrichIndex.getMappings().get(createdEnrichIndex).sourceAsMap();
         validateMappingMetadata(mapping, policyName, policy);
-        assertThat(mapping.get("dynamic"), is("false"));
-        Map<?, ?> properties = (Map<?, ?>) mapping.get("properties");
-        assertNotNull(properties);
-        assertThat(properties.size(), is(equalTo(1)));
-        Map<?, ?> field1 = (Map<?, ?>) properties.get("location");
-        assertNotNull(field1);
-        assertThat(field1.get("type"), is(equalTo("geo_shape")));
-        assertNull(field1.get("doc_values"));
-
+        assertEnrichMapping(mapping, """
+            {
+                "location": {
+                    "type": "geo_shape"
+                },
+                "zipcode": {
+                    "type": "long",
+                    "index": false
+                }
+            }
+            """);
         // Validate document structure
         SearchResponse enrichSearchResponse = client().search(
             new SearchRequest(".enrich-test1").source(SearchSourceBuilder.searchSource().query(QueryBuilders.matchAllQuery()))
@@ -307,14 +320,18 @@ public class EnrichPolicyRunnerTests extends ESSingleNodeTestCase {
         // Validate Mapping
         Map<String, Object> mapping = enrichIndex.getMappings().get(createdEnrichIndex).sourceAsMap();
         validateMappingMetadata(mapping, policyName, policy);
-        assertThat(mapping.get("dynamic"), is("false"));
-        Map<?, ?> properties = (Map<?, ?>) mapping.get("properties");
-        assertNotNull(properties);
-        assertThat(properties.size(), is(equalTo(1)));
-        Map<?, ?> field1 = (Map<?, ?>) properties.get("range");
-        assertNotNull(field1);
-        assertThat(field1.get("type"), is(equalTo(rangeType + "_range")));
-        assertEquals(Boolean.FALSE, field1.get("doc_values"));
+        assertEnrichMapping(mapping, String.format(Locale.ROOT, """
+            {
+                "range": {
+                    "type": "%s",
+                    "doc_values": false
+                },
+                "zipcode": {
+                    "type": "long",
+                    "index": false
+                }
+            }
+            """, rangeType + "_range"));
 
         // Validate document structure
         SearchResponse enrichSearchResponse = client().search(
@@ -394,15 +411,18 @@ public class EnrichPolicyRunnerTests extends ESSingleNodeTestCase {
         // Validate Mapping
         Map<String, Object> mapping = enrichIndex.getMappings().get(createdEnrichIndex).sourceAsMap();
         validateMappingMetadata(mapping, policyName, policy);
-        assertThat(mapping.get("dynamic"), is("false"));
-        Map<?, ?> properties = (Map<?, ?>) mapping.get("properties");
-        assertNotNull(properties);
-        assertThat(properties.size(), is(equalTo(1)));
-        Map<?, ?> field1 = (Map<?, ?>) properties.get("subnet");
-        assertNotNull(field1);
-        assertThat(field1.get("type"), is(equalTo("ip_range")));
-        assertThat(field1.get("doc_values"), is(false));
-
+        assertEnrichMapping(mapping, """
+            {
+                "subnet": {
+                    "type": "ip_range",
+                    "doc_values": false
+                },
+                "department": {
+                    "type": "text",
+                    "index": false
+                }
+            }
+            """);
         // Validate document structure and lookup of element in range
         SearchResponse enrichSearchResponse = client().search(
             new SearchRequest(".enrich-test1").source(
@@ -483,15 +503,30 @@ public class EnrichPolicyRunnerTests extends ESSingleNodeTestCase {
         // Validate Mapping
         Map<String, Object> mapping = enrichIndex.getMappings().get(createdEnrichIndex).sourceAsMap();
         validateMappingMetadata(mapping, policyName, policy);
-        assertThat(mapping.get("dynamic"), is("false"));
-        Map<?, ?> properties = (Map<?, ?>) mapping.get("properties");
-        assertNotNull(properties);
-        assertThat(properties.size(), is(equalTo(1)));
-        Map<?, ?> keyfield = (Map<?, ?>) properties.get("key");
-        assertNotNull(keyfield);
-        assertThat(keyfield.get("type"), is(equalTo("keyword")));
-        assertThat(keyfield.get("doc_values"), is(false));
-
+        assertEnrichMapping(mapping, """
+            {
+              "key": {
+                "type": "keyword",
+                "doc_values": false
+              },
+              "idx": {
+                "type": "long",
+                "index": false
+              },
+              "field1": {
+                "type": "text",
+                "index": false
+              },
+              "field2": {
+                "type": "long",
+                "index": false
+              },
+              "field5": {
+                "type": "text",
+                "index": false
+              }
+            }
+            """);
         // Validate document structure
         SearchResponse enrichSearchResponse = client().search(
             new SearchRequest(".enrich-test1").source(SearchSourceBuilder.searchSource().query(QueryBuilders.matchAllQuery()))
@@ -578,15 +613,30 @@ public class EnrichPolicyRunnerTests extends ESSingleNodeTestCase {
 
         // Validate Mapping
         Map<String, Object> mapping = enrichIndex.getMappings().get(createdEnrichIndex).sourceAsMap();
-        assertThat(mapping.get("dynamic"), is("false"));
-        Map<?, ?> properties = (Map<?, ?>) mapping.get("properties");
-        assertNotNull(properties);
-        assertThat(properties.size(), is(equalTo(1)));
-        Map<?, ?> keyfield = (Map<?, ?>) properties.get("key");
-        assertNotNull(keyfield);
-        assertThat(keyfield.get("type"), is(equalTo("keyword")));
-        assertThat(keyfield.get("doc_values"), is(false));
-
+        assertEnrichMapping(mapping, """
+            {
+              "key": {
+                "type": "keyword",
+                "doc_values": false
+              },
+              "idx": {
+                "type": "long",
+                "index": false
+              },
+              "field1": {
+                "type": "text",
+                "index": false
+              },
+              "field2": {
+                "type": "long",
+                "index": false
+              },
+              "field5": {
+                "type": "text",
+                "index": false
+              }
+            }
+            """);
         // Validate document structure
         SearchResponse enrichSearchResponse = client().search(
             new SearchRequest(".enrich-test1").source(SearchSourceBuilder.searchSource().query(QueryBuilders.matchAllQuery()))
@@ -675,15 +725,30 @@ public class EnrichPolicyRunnerTests extends ESSingleNodeTestCase {
 
         // Validate Mapping
         Map<String, Object> mapping = enrichIndex.getMappings().get(createdEnrichIndex).sourceAsMap();
-        assertThat(mapping.get("dynamic"), is("false"));
-        Map<?, ?> properties = (Map<?, ?>) mapping.get("properties");
-        assertNotNull(properties);
-        assertThat(properties.size(), is(equalTo(1)));
-        Map<?, ?> keyfield = (Map<?, ?>) properties.get("key");
-        assertNotNull(keyfield);
-        assertThat(keyfield.get("type"), is(equalTo("keyword")));
-        assertThat(keyfield.get("doc_values"), is(false));
-
+        assertEnrichMapping(mapping, """
+            {
+              "key": {
+                "type": "keyword",
+                "doc_values": false
+              },
+              "field1": {
+                "type": "text",
+                "index": false
+              },
+              "field2": {
+                "type": "long",
+                "index": false
+              },
+              "field5": {
+                "type": "text",
+                "index": false
+              },
+              "idx": {
+                "type": "long",
+                "index": false
+              }
+            }
+            """);
         // Validate document structure
         SearchResponse enrichSearchResponse = client().search(
             new SearchRequest(".enrich-test1").source(SearchSourceBuilder.searchSource().query(QueryBuilders.matchAllQuery()))
@@ -971,21 +1036,22 @@ public class EnrichPolicyRunnerTests extends ESSingleNodeTestCase {
         // Validate Mapping
         Map<String, Object> mapping = enrichIndex.getMappings().get(createdEnrichIndex).sourceAsMap();
         validateMappingMetadata(mapping, policyName, policy);
-        assertThat(mapping.get("dynamic"), is("false"));
-        Map<?, ?> properties = (Map<?, ?>) mapping.get("properties");
-        assertNotNull(properties);
-        assertThat(properties.size(), is(equalTo(1)));
-        Map<?, ?> data = (Map<?, ?>) properties.get("data");
-        assertNotNull(data);
-        assertThat(data.size(), is(equalTo(1)));
-        Map<?, ?> dataProperties = (Map<?, ?>) data.get("properties");
-        assertNotNull(dataProperties);
-        assertThat(dataProperties.size(), is(equalTo(1)));
-        Map<?, ?> field1 = (Map<?, ?>) dataProperties.get("field1");
-        assertNotNull(field1);
-        assertThat(field1.get("type"), is(equalTo("keyword")));
-        assertThat(field1.get("doc_values"), is(false));
-
+        assertEnrichMapping(mapping, """
+            {
+              "data": {
+                "properties": {
+                  "field1": {
+                    "type": "keyword",
+                    "doc_values": false
+                  },
+                  "field2": {
+                    "type": "integer",
+                    "index": false
+                  }
+                }
+              }
+            }
+            """);
         SearchResponse enrichSearchResponse = client().search(
             new SearchRequest(".enrich-test1").source(SearchSourceBuilder.searchSource().query(QueryBuilders.matchAllQuery()))
         ).actionGet();
@@ -1082,21 +1148,22 @@ public class EnrichPolicyRunnerTests extends ESSingleNodeTestCase {
         // Validate Mapping
         Map<String, Object> mapping = enrichIndex.getMappings().get(createdEnrichIndex).sourceAsMap();
         validateMappingMetadata(mapping, policyName, policy);
-        assertThat(mapping.get("dynamic"), is("false"));
-        Map<?, ?> properties = (Map<?, ?>) mapping.get("properties");
-        assertNotNull(properties);
-        assertThat(properties.size(), is(equalTo(1)));
-        Map<?, ?> data = (Map<?, ?>) properties.get("data");
-        assertNotNull(data);
-        assertThat(data.size(), is(equalTo(1)));
-        Map<?, ?> dataProperties = (Map<?, ?>) data.get("properties");
-        assertNotNull(dataProperties);
-        assertThat(dataProperties.size(), is(equalTo(1)));
-        Map<?, ?> field1 = (Map<?, ?>) dataProperties.get("field1");
-        assertNotNull(field1);
-        assertThat(field1.get("type"), is(equalTo("keyword")));
-        assertThat(field1.get("doc_values"), is(false));
-
+        assertEnrichMapping(mapping, """
+            {
+              "data": {
+                "properties": {
+                  "field1": {
+                    "type": "keyword",
+                    "doc_values": false
+                  },
+                  "field2": {
+                    "type": "integer",
+                    "index": false
+                  }
+                }
+              }
+            }
+            """);
         SearchResponse enrichSearchResponse = client().search(
             new SearchRequest(".enrich-test1").source(SearchSourceBuilder.searchSource().query(QueryBuilders.matchAllQuery()))
         ).actionGet();
@@ -1194,21 +1261,22 @@ public class EnrichPolicyRunnerTests extends ESSingleNodeTestCase {
         // Validate Mapping
         Map<String, Object> mapping = enrichIndex.getMappings().get(createdEnrichIndex).sourceAsMap();
         validateMappingMetadata(mapping, policyName, policy);
-        assertThat(mapping.get("dynamic"), is("false"));
-        Map<?, ?> properties = (Map<?, ?>) mapping.get("properties");
-        assertNotNull(properties);
-        assertThat(properties.size(), is(equalTo(1)));
-        Map<?, ?> data = (Map<?, ?>) properties.get("data");
-        assertNotNull(data);
-        assertThat(data.size(), is(equalTo(1)));
-        Map<?, ?> dataProperties = (Map<?, ?>) data.get("properties");
-        assertNotNull(dataProperties);
-        assertThat(dataProperties.size(), is(equalTo(1)));
-        Map<?, ?> field1 = (Map<?, ?>) dataProperties.get("subnet");
-        assertNotNull(field1);
-        assertThat(field1.get("type"), is(equalTo("ip_range")));
-        assertThat(field1.get("doc_values"), is(false));
-
+        assertEnrichMapping(mapping, """
+            {
+                "data": {
+                    "properties": {
+                        "subnet": {
+                            "type": "ip_range",
+                            "doc_values": false
+                        },
+                        "department": {
+                            "type": "keyword",
+                            "index": false
+                        }
+                    }
+                }
+            }
+            """);
         SearchResponse enrichSearchResponse = client().search(
             new SearchRequest(".enrich-test1").source(
                 SearchSourceBuilder.searchSource().query(QueryBuilders.matchQuery("data.subnet", "10.0.0.1"))
@@ -1315,26 +1383,26 @@ public class EnrichPolicyRunnerTests extends ESSingleNodeTestCase {
         // Validate Mapping
         Map<String, Object> mapping = enrichIndex.getMappings().get(createdEnrichIndex).sourceAsMap();
         validateMappingMetadata(mapping, policyName, policy);
-        assertThat(mapping.get("dynamic"), is("false"));
-        Map<?, ?> properties = (Map<?, ?>) mapping.get("properties");
-        assertNotNull(properties);
-        assertThat(properties.size(), is(equalTo(1)));
-        Map<?, ?> data = (Map<?, ?>) properties.get("data");
-        assertNotNull(data);
-        assertThat(data.size(), is(equalTo(1)));
-        Map<?, ?> dataProperties = (Map<?, ?>) data.get("properties");
-        assertNotNull(dataProperties);
-        assertThat(dataProperties.size(), is(equalTo(1)));
-        Map<?, ?> fields = (Map<?, ?>) dataProperties.get("fields");
-        assertNotNull(fields);
-        assertThat(fields.size(), is(equalTo(1)));
-        Map<?, ?> fieldsProperties = (Map<?, ?>) fields.get("properties");
-        assertNotNull(fieldsProperties);
-        assertThat(fieldsProperties.size(), is(equalTo(1)));
-        Map<?, ?> field1 = (Map<?, ?>) fieldsProperties.get("field1");
-        assertNotNull(field1);
-        assertThat(field1.get("type"), is(equalTo("keyword")));
-        assertThat(field1.get("doc_values"), is(false));
+        assertEnrichMapping(mapping, """
+            {
+              "data": {
+                "properties": {
+                  "fields": {
+                    "properties": {
+                      "field1": {
+                        "type": "keyword",
+                        "doc_values": false
+                      },
+                      "field2": {
+                        "type": "integer",
+                        "index": false
+                      }
+                    }
+                  }
+                }
+              }
+            }
+            """);
 
         SearchResponse enrichSearchResponse = client().search(
             new SearchRequest(".enrich-test1").source(SearchSourceBuilder.searchSource().query(QueryBuilders.matchAllQuery()))
@@ -1442,27 +1510,26 @@ public class EnrichPolicyRunnerTests extends ESSingleNodeTestCase {
         // Validate Mapping
         Map<String, Object> mapping = enrichIndex.getMappings().get(createdEnrichIndex).sourceAsMap();
         validateMappingMetadata(mapping, policyName, policy);
-        assertThat(mapping.get("dynamic"), is("false"));
-        Map<?, ?> properties = (Map<?, ?>) mapping.get("properties");
-        assertNotNull(properties);
-        assertThat(properties.size(), is(equalTo(1)));
-        Map<?, ?> data = (Map<?, ?>) properties.get("data");
-        assertNotNull(data);
-        assertThat(data.size(), is(equalTo(1)));
-        Map<?, ?> dataProperties = (Map<?, ?>) data.get("properties");
-        assertNotNull(dataProperties);
-        assertThat(dataProperties.size(), is(equalTo(1)));
-        Map<?, ?> fields = (Map<?, ?>) dataProperties.get("fields");
-        assertNotNull(fields);
-        assertThat(fields.size(), is(equalTo(1)));
-        Map<?, ?> fieldsProperties = (Map<?, ?>) fields.get("properties");
-        assertNotNull(fieldsProperties);
-        assertThat(fieldsProperties.size(), is(equalTo(1)));
-        Map<?, ?> field1 = (Map<?, ?>) fieldsProperties.get("subnet");
-        assertNotNull(field1);
-        assertThat(field1.get("type"), is(equalTo("ip_range")));
-        assertThat(field1.get("doc_values"), is(false));
-
+        assertEnrichMapping(mapping, """
+            {
+                "data": {
+                    "properties": {
+                        "fields": {
+                            "properties": {
+                                "subnet": {
+                                    "type": "ip_range",
+                                    "doc_values": false
+                                },
+                                "department": {
+                                    "type": "keyword",
+                                    "index": false
+                                }
+                            }
+                        }
+                    }
+                }
+            }
+            """);
         SearchResponse enrichSearchResponse = client().search(
             new SearchRequest(".enrich-test1").source(SearchSourceBuilder.searchSource().query(QueryBuilders.matchAllQuery()))
         ).actionGet();
@@ -1576,26 +1643,27 @@ public class EnrichPolicyRunnerTests extends ESSingleNodeTestCase {
         // Validate Mapping
         Map<String, Object> mapping = enrichIndex.getMappings().get(createdEnrichIndex).sourceAsMap();
         validateMappingMetadata(mapping, policyName, policy);
-        assertThat(mapping.get("dynamic"), is("false"));
-        Map<?, ?> properties = (Map<?, ?>) mapping.get("properties");
-        assertNotNull(properties);
-        assertThat(properties.size(), is(equalTo(1)));
-        Map<?, ?> data = (Map<?, ?>) properties.get("data");
-        assertNotNull(data);
-        assertThat(data.size(), is(equalTo(1)));
-        Map<?, ?> dataProperties = (Map<?, ?>) data.get("properties");
-        assertNotNull(dataProperties);
-        assertThat(dataProperties.size(), is(equalTo(1)));
-        Map<?, ?> fields = (Map<?, ?>) dataProperties.get("fields");
-        assertNotNull(fields);
-        assertThat(fields.size(), is(equalTo(1)));
-        Map<?, ?> fieldsProperties = (Map<?, ?>) fields.get("properties");
-        assertNotNull(fieldsProperties);
-        assertThat(fieldsProperties.size(), is(equalTo(1)));
-        Map<?, ?> field1 = (Map<?, ?>) fieldsProperties.get("period");
-        assertNotNull(field1);
-        assertThat(field1.get("type"), is(equalTo("date_range")));
-        assertThat(field1.get("doc_values"), is(false));
+        assertEnrichMapping(mapping, """
+            {
+             "data": {
+                "properties": {
+                    "fields": {
+                        "properties": {
+                            "period": {
+                                "type": "date_range",
+                                "format": "yyyy'/'MM'/'dd' at 'HH':'mm||strict_date_time",
+                                "doc_values": false
+                            },
+                            "status": {
+                                "type": "keyword",
+                                "index": false
+                            }
+                        }
+                    }
+                }
+             }
+            }
+            """);
 
         SearchResponse enrichSearchResponse = client().search(
             new SearchRequest(".enrich-test1").source(
@@ -1708,21 +1776,22 @@ public class EnrichPolicyRunnerTests extends ESSingleNodeTestCase {
         // Validate Mapping
         Map<String, Object> mapping = enrichIndex.getMappings().get(createdEnrichIndex).sourceAsMap();
         validateMappingMetadata(mapping, policyName, policy);
-        assertThat(mapping.get("dynamic"), is("false"));
-        Map<?, ?> properties = (Map<?, ?>) mapping.get("properties");
-        assertNotNull(properties);
-        assertThat(properties.size(), is(equalTo(1)));
-        Map<?, ?> data = (Map<?, ?>) properties.get("data");
-        assertNotNull(data);
-        assertThat(data.size(), is(equalTo(1)));
-        Map<?, ?> dataProperties = (Map<?, ?>) data.get("properties");
-        assertNotNull(dataProperties);
-        assertThat(dataProperties.size(), is(equalTo(1)));
-        Map<?, ?> field1 = (Map<?, ?>) dataProperties.get("field1");
-        assertNotNull(field1);
-        assertThat(field1.get("type"), is(equalTo("keyword")));
-        assertThat(field1.get("doc_values"), is(false));
-
+        assertEnrichMapping(mapping, """
+            {
+              "data": {
+                "properties": {
+                  "field1": {
+                    "type": "keyword",
+                    "doc_values": false
+                  },
+                  "field2": {
+                    "type": "integer",
+                    "index": false
+                  }
+                }
+              }
+            }
+            """);
         SearchResponse enrichSearchResponse = client().search(
             new SearchRequest(".enrich-test1").source(SearchSourceBuilder.searchSource().query(QueryBuilders.matchAllQuery()))
         ).actionGet();
@@ -1867,16 +1936,22 @@ public class EnrichPolicyRunnerTests extends ESSingleNodeTestCase {
 
         // Validate Mapping
         Map<String, Object> mapping = enrichIndex.getMappings().get(createdEnrichIndex).sourceAsMap();
-        validateMappingMetadata(mapping, policyName, policy);
-        assertThat(mapping.get("dynamic"), is("false"));
-        Map<?, ?> properties = (Map<?, ?>) mapping.get("properties");
-        assertNotNull(properties);
-        assertThat(properties.size(), is(equalTo(1)));
-        Map<?, ?> field1 = (Map<?, ?>) properties.get("field1");
-        assertNotNull(field1);
-        assertThat(field1.get("type"), is(equalTo("keyword")));
-        assertThat(field1.get("doc_values"), is(false));
-
+        assertEnrichMapping(mapping, """
+            {
+              "field1": {
+                "type": "keyword",
+                "doc_values": false
+              },
+              "field2": {
+                "type": "long",
+                "index": false
+              },
+              "field5": {
+                "type": "text",
+                "index": false
+              }
+            }
+            """);
         // Validate document structure
         SearchResponse allEnrichDocs = client().search(
             new SearchRequest(".enrich-test1").source(SearchSourceBuilder.searchSource().query(QueryBuilders.matchAllQuery()))
@@ -2007,6 +2082,108 @@ public class EnrichPolicyRunnerTests extends ESSingleNodeTestCase {
         assertThat(e.getMessage(), equalTo("Field 'field1' has type [object] which doesn't appear to be a range type"));
     }
 
+    public void testEnrichFieldsConflictMappingTypes() {
+        createIndex("source-1", Settings.EMPTY, "_doc", "user", "type=keyword", "name", "type=text", "zipcode", "type=long");
+        client().prepareIndex("source-1")
+            .setSource("user", "u1", "name", "n", "zipcode", 90000)
+            .setRefreshPolicy(WriteRequest.RefreshPolicy.IMMEDIATE)
+            .get();
+        createIndex("source-2", Settings.EMPTY, "_doc", "user", "type=keyword", "zipcode", "type=long");
+
+        client().prepareIndex("source-2").setSource("""
+            {
+              "user": "u2",
+              "name": {
+                "first": "f",
+                "last": "l"
+              },
+              "zipcode": 90001
+            }
+            """, XContentType.JSON).setRefreshPolicy(WriteRequest.RefreshPolicy.IMMEDIATE).get();
+
+        EnrichPolicy policy = new EnrichPolicy(
+            EnrichPolicy.MATCH_TYPE,
+            null,
+            List.of("source-1", "source-2"),
+            "user",
+            List.of("name", "zipcode")
+        );
+        String policyName = "test1";
+        final long createTime = randomNonNegativeLong();
+        String createdEnrichIndex = ".enrich-test1-" + createTime;
+        PlainActionFuture<ExecuteEnrichPolicyStatus> future = new PlainActionFuture<>();
+        EnrichPolicyRunner enrichPolicyRunner = createPolicyRunner(policyName, policy, future, createdEnrichIndex);
+        enrichPolicyRunner.run();
+        future.actionGet();
+
+        // Validate Index definition
+        GetIndexResponse enrichIndex = getGetIndexResponseAndCheck(createdEnrichIndex);
+        Map<String, Object> mapping = enrichIndex.getMappings().get(createdEnrichIndex).sourceAsMap();
+        assertEnrichMapping(mapping, """
+            {
+              "user": {
+                "type": "keyword",
+                "doc_values": false
+              },
+              "zipcode": {
+                "type": "long",
+                "index": false
+              }
+            }
+            """);
+        // Validate document structure
+        SearchResponse searchResponse = client().search(new SearchRequest(".enrich-test1")).actionGet();
+        ElasticsearchAssertions.assertHitCount(searchResponse, 2L);
+        Map<String, Object> hit0 = searchResponse.getHits().getAt(0).getSourceAsMap();
+        assertThat(hit0, equalTo(Map.of("user", "u1", "name", "n", "zipcode", 90000)));
+        Map<String, Object> hit1 = searchResponse.getHits().getAt(1).getSourceAsMap();
+        assertThat(hit1, equalTo(Map.of("user", "u2", "name", Map.of("first", "f", "last", "l"), "zipcode", 90001)));
+    }
+
+    public void testEnrichMappingConflictFormats() {
+        createIndex("source-1", Settings.EMPTY, "_doc", "user", "type=keyword", "date", "type=date,format=yyyy");
+        client().prepareIndex("source-1")
+            .setSource("user", "u1", "date", "2023")
+            .setRefreshPolicy(WriteRequest.RefreshPolicy.IMMEDIATE)
+            .get();
+        createIndex("source-2", Settings.EMPTY, "_doc", "user", "type=keyword", "date", "type=date,format=yyyy-MM");
+
+        client().prepareIndex("source-2").setSource("""
+            {
+              "user": "u2",
+              "date": "2023-05"
+            }
+            """, XContentType.JSON).setRefreshPolicy(WriteRequest.RefreshPolicy.IMMEDIATE).get();
+
+        EnrichPolicy policy = new EnrichPolicy(EnrichPolicy.MATCH_TYPE, null, List.of("source-1", "source-2"), "user", List.of("date"));
+        String policyName = "test1";
+        final long createTime = randomNonNegativeLong();
+        String createdEnrichIndex = ".enrich-test1-" + createTime;
+        PlainActionFuture<ExecuteEnrichPolicyStatus> future = new PlainActionFuture<>();
+        EnrichPolicyRunner enrichPolicyRunner = createPolicyRunner(policyName, policy, future, createdEnrichIndex);
+        enrichPolicyRunner.run();
+        future.actionGet();
+
+        // Validate Index definition
+        GetIndexResponse enrichIndex = getGetIndexResponseAndCheck(createdEnrichIndex);
+        Map<String, Object> mapping = enrichIndex.getMappings().get(createdEnrichIndex).sourceAsMap();
+        assertEnrichMapping(mapping, """
+            {
+              "user": {
+                "type": "keyword",
+                "doc_values": false
+              }
+            }
+            """);
+        // Validate document structure
+        SearchResponse searchResponse = client().search(new SearchRequest(".enrich-test1")).actionGet();
+        ElasticsearchAssertions.assertHitCount(searchResponse, 2L);
+        Map<String, Object> hit0 = searchResponse.getHits().getAt(0).getSourceAsMap();
+        assertThat(hit0, equalTo(Map.of("user", "u1", "date", "2023")));
+        Map<String, Object> hit1 = searchResponse.getHits().getAt(1).getSourceAsMap();
+        assertThat(hit1, equalTo(Map.of("user", "u2", "date", "2023-05")));
+    }
+
     private EnrichPolicyRunner createPolicyRunner(
         String policyName,
         EnrichPolicy policy,
@@ -2124,4 +2301,11 @@ public class EnrichPolicyRunnerTests extends ESSingleNodeTestCase {
 
         assertThat(expected.getMessage(), containsString("index [" + createdEnrichIndex + "] blocked by: [FORBIDDEN/8/index write (api)]"));
     }
+
+    private static void assertEnrichMapping(Map<String, Object> actual, String expectedMapping) {
+        assertThat(actual.get("dynamic"), is("false"));
+        Object actualProperties = actual.get("properties");
+        Map<String, Object> mappings = XContentHelper.convertToMap(JsonXContent.jsonXContent, expectedMapping, false);
+        assertThat(actualProperties, equalTo(mappings));
+    }
 }