Browse Source

Adding simulate ingest effective mapping (#132833)

Keith Massey 2 months ago
parent
commit
768396db8c

+ 5 - 0
docs/changelog/132833.yaml

@@ -0,0 +1,5 @@
+pr: 132833
+summary: Adding simulate ingest effective mapping
+area: Ingest Node
+type: enhancement
+issues: []

+ 128 - 0
qa/smoke-test-ingest-with-all-dependencies/src/yamlRestTest/resources/rest-api-spec/test/ingest/80_ingest_simulate.yml

@@ -2182,3 +2182,131 @@ setup:
   - match: { docs.0.doc._index: "test" }
   - match: { docs.0.doc._source.foo: "bar" }
   - match: { docs.0.doc.error.type: "document_parsing_exception" }
+
+---
+"Test effective mapping":
+
+  # This creates two templates, where the first reroutes to the second. Then we simulate ingesting and make sure that
+  # the effective_mapping is for the index where the document eventually would land. Also, the second index is really
+  # a data stream, so we expect to see a @timestamp field.
+
+  - skip:
+      features:
+        - headers
+        - allowed_warnings
+
+  - do:
+      headers:
+        Content-Type: application/json
+      ingest.put_pipeline:
+        id: "reroute-pipeline"
+        body:  >
+          {
+            "processors": [
+              {
+                "reroute": {
+                  "destination": "second-index"
+                }
+              }
+            ]
+          }
+  - match: { acknowledged: true }
+
+  - do:
+      allowed_warnings:
+        - "index template [first-index-template] has index patterns [first-index*] matching patterns from existing older templates [global] with patterns (global => [*]); this template [first-index-template] will take precedence during new index creation"
+      indices.put_index_template:
+        name: first-index-template
+        body:
+          index_patterns: first-index*
+          template:
+            settings:
+              default_pipeline: "reroute-pipeline"
+            mappings:
+              dynamic: strict
+              properties:
+                foo:
+                  type: text
+
+  - do:
+      allowed_warnings:
+        - "index template [second-index-template] has index patterns [second-index*] matching patterns from existing older templates [global] with patterns (global => [*]); this template [second-index-template] will take precedence during new index creation"
+      indices.put_index_template:
+        name: second-index-template
+        body:
+          index_patterns: second-index*
+          template:
+            mappings:
+              dynamic: strict
+              properties:
+                bar:
+                  type: text
+
+  - do:
+      indices.put_index_template:
+        name: second-index-template
+        body:
+          index_patterns: second-index*
+          template:
+            lifecycle:
+              data_retention: "7d"
+            mappings:
+              dynamic: strict
+              properties:
+                bar:
+                  type: text
+          data_stream: {}
+
+  - do:
+      indices.create_data_stream:
+        name: second-index
+  - is_true: acknowledged
+
+  - do:
+      cluster.health:
+        wait_for_status: yellow
+
+  - do:
+      indices.put_data_stream_mappings:
+        name: second-index
+        body:
+          properties:
+            foo:
+              type: boolean
+
+  - match: { data_streams.0.applied_to_data_stream: true }
+
+  # Here is the meat of the test. We simulate ingesting into first-index, knowing it will be rerouted to second-index,
+  # which is actually a data stream. So we expect the effective_mapping to contain the fields from second-index
+  # (including the implicit @timestamp field) and not second-index. Plus, it ought to include fields from the
+  # mapping_addition that we pass in.
+  - do:
+      headers:
+        Content-Type: application/json
+      simulate.ingest:
+        body: >
+          {
+            "docs": [
+              {
+                "_index": "first-index",
+                "_id": "id",
+                "_source": {
+                  "foo": "bar"
+                }
+              }
+            ],
+            "mapping_addition": {
+              "dynamic": "strict",
+              "properties": {
+                "baz": {
+                  "type": "keyword"
+                }
+              }
+            }
+          }
+  - length: { docs: 1 }
+  - match: { docs.0.doc._index: "second-index" }
+  - not_exists: docs.0.doc.effective_mapping._doc.properties.foo
+  - match: { docs.0.doc.effective_mapping._doc.properties.@timestamp.type: "date" }
+  - match: { docs.0.doc.effective_mapping._doc.properties.bar.type: "text" }
+  - match: { docs.0.doc.effective_mapping._doc.properties.baz.type: "keyword" }

+ 1 - 0
server/src/main/java/org/elasticsearch/TransportVersions.java

@@ -364,6 +364,7 @@ public class TransportVersions {
     public static final TransportVersion EXTENDED_SNAPSHOT_STATS_IN_NODE_INFO = def(9_137_0_00);
     public static final TransportVersion SIMULATE_INGEST_MAPPING_MERGE_TYPE = def(9_138_0_00);
     public static final TransportVersion ESQL_LOOKUP_JOIN_ON_MANY_FIELDS = def(9_139_0_00);
+    public static final TransportVersion SIMULATE_INGEST_EFFECTIVE_MAPPING = def(9_140_0_00);
 
     /*
      * STOP! READ THIS FIRST! No, really,

+ 19 - 15
server/src/main/java/org/elasticsearch/action/bulk/TransportSimulateBulkAction.java

@@ -35,7 +35,6 @@ import org.elasticsearch.common.settings.Settings;
 import org.elasticsearch.common.util.concurrent.AtomicArray;
 import org.elasticsearch.common.xcontent.XContentHelper;
 import org.elasticsearch.core.Nullable;
-import org.elasticsearch.core.Tuple;
 import org.elasticsearch.features.FeatureService;
 import org.elasticsearch.features.NodeFeature;
 import org.elasticsearch.index.IndexSettingProvider;
@@ -144,14 +143,13 @@ public class TransportSimulateBulkAction extends TransportAbstractBulkAction {
             DocWriteRequest<?> docRequest = bulkRequest.requests.get(i);
             assert docRequest instanceof IndexRequest : "TransportSimulateBulkAction should only ever be called with IndexRequests";
             IndexRequest request = (IndexRequest) docRequest;
-            Tuple<Collection<String>, Exception> validationResult = validateMappings(
+            ValidationResult validationResult = validateMappings(
                 componentTemplateSubstitutions,
                 indexTemplateSubstitutions,
                 mappingAddition,
                 request,
                 mappingMergeReason
             );
-            Exception mappingValidationException = validationResult.v2();
             responses.set(
                 i,
                 BulkItemResponse.success(
@@ -164,8 +162,9 @@ public class TransportSimulateBulkAction extends TransportAbstractBulkAction {
                         request.source(),
                         request.getContentType(),
                         request.getExecutedPipelines(),
-                        validationResult.v1(),
-                        mappingValidationException
+                        validationResult.ignoredFields,
+                        validationResult.validationException,
+                        validationResult.effectiveMapping
                     )
                 )
             );
@@ -193,7 +192,7 @@ public class TransportSimulateBulkAction extends TransportAbstractBulkAction {
      * @return a Tuple containing: (1) in v1 the names of any fields that would be ignored upon indexing and (2) in v2 the mapping
      * exception if the source does not match the mappings, otherwise null
      */
-    private Tuple<Collection<String>, Exception> validateMappings(
+    private ValidationResult validateMappings(
         Map<String, ComponentTemplate> componentTemplateSubstitutions,
         Map<String, ComposableIndexTemplate> indexTemplateSubstitutions,
         Map<String, Object> mappingAddition,
@@ -211,6 +210,7 @@ public class TransportSimulateBulkAction extends TransportAbstractBulkAction {
         );
 
         ProjectMetadata project = projectResolver.getProjectMetadata(clusterService.state());
+        CompressedXContent effectiveMapping = null;
         Exception mappingValidationException = null;
         Collection<String> ignoredFields = List.of();
         IndexAbstraction indexAbstraction = project.getIndicesLookup().get(request.index());
@@ -222,8 +222,8 @@ public class TransportSimulateBulkAction extends TransportAbstractBulkAction {
                  */
                 IndexMetadata imd = project.getIndexSafe(indexAbstraction.getWriteIndex(request, project));
                 CompressedXContent mappings = Optional.ofNullable(imd.mapping()).map(MappingMetadata::source).orElse(null);
-                CompressedXContent mergedMappings = mappingAddition == null ? null : mergeMappings(mappings, mappingAddition);
-                ignoredFields = validateUpdatedMappingsFromIndexMetadata(imd, mergedMappings, request, sourceToParse, mappingMergeReason);
+                effectiveMapping = mappingAddition == null ? null : mergeMappings(mappings, mappingAddition);
+                ignoredFields = validateUpdatedMappingsFromIndexMetadata(imd, effectiveMapping, request, sourceToParse, mappingMergeReason);
             } else {
                 /*
                  * The index did not exist, or we have component template substitutions, so we put together the mappings from existing
@@ -281,8 +281,8 @@ public class TransportSimulateBulkAction extends TransportAbstractBulkAction {
                         indexSettingProviders
                     );
                     CompressedXContent mappings = template.mappings();
-                    CompressedXContent mergedMappings = mergeMappings(mappings, mappingAddition);
-                    ignoredFields = validateUpdatedMappings(mappings, mergedMappings, request, sourceToParse, mappingMergeReason);
+                    effectiveMapping = mergeMappings(mappings, mappingAddition);
+                    ignoredFields = validateUpdatedMappings(mappings, effectiveMapping, request, sourceToParse, mappingMergeReason);
                 } else {
                     List<IndexTemplateMetadata> matchingTemplates = findV1Templates(simulatedProjectMetadata, request.index(), false);
                     if (matchingTemplates.isEmpty() == false) {
@@ -295,23 +295,27 @@ public class TransportSimulateBulkAction extends TransportAbstractBulkAction {
                             matchingTemplates.stream().map(IndexTemplateMetadata::getMappings).collect(toList()),
                             xContentRegistry
                         );
-                        final CompressedXContent combinedMappings = mergeMappings(new CompressedXContent(mappingsMap), mappingAddition);
-                        ignoredFields = validateUpdatedMappings(null, combinedMappings, request, sourceToParse, mappingMergeReason);
+                        effectiveMapping = mergeMappings(new CompressedXContent(mappingsMap), mappingAddition);
+                        ignoredFields = validateUpdatedMappings(null, effectiveMapping, request, sourceToParse, mappingMergeReason);
                     } else {
                         /*
                          * The index matched no templates and had no mapping of its own. If there were component template substitutions
                          * or index template substitutions, they didn't match anything. So just apply the mapping addition if it exists,
                          * and validate.
                          */
-                        final CompressedXContent combinedMappings = mergeMappings(null, mappingAddition);
-                        ignoredFields = validateUpdatedMappings(null, combinedMappings, request, sourceToParse, mappingMergeReason);
+                        effectiveMapping = mergeMappings(null, mappingAddition);
+                        ignoredFields = validateUpdatedMappings(null, effectiveMapping, request, sourceToParse, mappingMergeReason);
                     }
                 }
             }
         } catch (Exception e) {
             mappingValidationException = e;
         }
-        return Tuple.tuple(ignoredFields, mappingValidationException);
+        return new ValidationResult(effectiveMapping, mappingValidationException, ignoredFields);
+    }
+
+    private record ValidationResult(CompressedXContent effectiveMapping, Exception validationException, Collection<String> ignoredFields) {
+
     }
 
     /*

+ 29 - 1
server/src/main/java/org/elasticsearch/action/ingest/SimulateIndexResponse.java

@@ -14,6 +14,7 @@ import org.elasticsearch.TransportVersions;
 import org.elasticsearch.action.bulk.IndexDocFailureStoreStatus;
 import org.elasticsearch.action.index.IndexResponse;
 import org.elasticsearch.common.bytes.BytesReference;
+import org.elasticsearch.common.compress.CompressedXContent;
 import org.elasticsearch.common.io.stream.StreamInput;
 import org.elasticsearch.common.io.stream.StreamOutput;
 import org.elasticsearch.common.xcontent.XContentHelper;
@@ -26,6 +27,7 @@ import org.elasticsearch.xcontent.XContentType;
 import java.io.IOException;
 import java.util.Collection;
 import java.util.List;
+import java.util.Map;
 
 /**
  * This is an IndexResponse that is specifically for simulate requests. Unlike typical IndexResponses, we need to include the original
@@ -37,6 +39,7 @@ public class SimulateIndexResponse extends IndexResponse {
     private final XContentType sourceXContentType;
     private final Collection<String> ignoredFields;
     private final Exception exception;
+    private final CompressedXContent effectiveMapping;
 
     @SuppressWarnings("this-escape")
     public SimulateIndexResponse(StreamInput in) throws IOException {
@@ -54,6 +57,15 @@ public class SimulateIndexResponse extends IndexResponse {
         } else {
             this.ignoredFields = List.of();
         }
+        if (in.getTransportVersion().onOrAfter(TransportVersions.SIMULATE_INGEST_EFFECTIVE_MAPPING)) {
+            if (in.readBoolean()) {
+                this.effectiveMapping = CompressedXContent.readCompressedString(in);
+            } else {
+                this.effectiveMapping = null;
+            }
+        } else {
+            effectiveMapping = null;
+        }
     }
 
     @SuppressWarnings("this-escape")
@@ -65,7 +77,8 @@ public class SimulateIndexResponse extends IndexResponse {
         XContentType sourceXContentType,
         List<String> pipelines,
         Collection<String> ignoredFields,
-        @Nullable Exception exception
+        @Nullable Exception exception,
+        @Nullable CompressedXContent effectiveMapping
     ) {
         // We don't actually care about most of the IndexResponse fields:
         super(
@@ -83,6 +96,7 @@ public class SimulateIndexResponse extends IndexResponse {
         setShardInfo(ShardInfo.EMPTY);
         this.ignoredFields = ignoredFields;
         this.exception = exception;
+        this.effectiveMapping = effectiveMapping;
     }
 
     @Override
@@ -108,6 +122,14 @@ public class SimulateIndexResponse extends IndexResponse {
             ElasticsearchException.generateThrowableXContent(builder, params, exception);
             builder.endObject();
         }
+        if (effectiveMapping == null) {
+            builder.field("effective_mapping", Map.of());
+        } else {
+            builder.field(
+                "effective_mapping",
+                XContentHelper.convertToMap(effectiveMapping.uncompressed(), true, builder.contentType()).v2()
+            );
+        }
         return builder;
     }
 
@@ -127,6 +149,12 @@ public class SimulateIndexResponse extends IndexResponse {
         if (out.getTransportVersion().onOrAfter(TransportVersions.SIMULATE_IGNORED_FIELDS)) {
             out.writeStringCollection(ignoredFields);
         }
+        if (out.getTransportVersion().onOrAfter(TransportVersions.SIMULATE_INGEST_EFFECTIVE_MAPPING)) {
+            out.writeBoolean(effectiveMapping != null);
+            if (effectiveMapping != null) {
+                effectiveMapping.writeTo(out);
+            }
+        }
     }
 
     public Exception getException() {

+ 9 - 4
server/src/test/java/org/elasticsearch/action/bulk/TransportSimulateBulkActionTests.java

@@ -185,7 +185,8 @@ public class TransportSimulateBulkActionTests extends ESTestCase {
                                           "_index": "%s",
                                           "_version": -3,
                                           "_source": %s,
-                                          "executed_pipelines": [%s]
+                                          "executed_pipelines": [%s],
+                                          "effective_mapping":{}
                                         }""",
                                     indexRequest.id(),
                                     indexRequest.index(),
@@ -319,7 +320,8 @@ public class TransportSimulateBulkActionTests extends ESTestCase {
                                               "_version": -3,
                                               "_source": %s,
                                               "executed_pipelines": [%s],
-                                              "error":{"type":"exception","reason":"invalid mapping"}
+                                              "error":{"type":"exception","reason":"invalid mapping"},
+                                              "effective_mapping":{"_doc":{"dynamic":"strict"}}
                                             }""",
                                         indexRequest.id(),
                                         indexName,
@@ -346,7 +348,8 @@ public class TransportSimulateBulkActionTests extends ESTestCase {
                                               "_index": "%s",
                                               "_version": -3,
                                               "_source": %s,
-                                              "executed_pipelines": [%s]
+                                              "executed_pipelines": [%s],
+                                              "effective_mapping":{"_doc":{"dynamic":"strict"}}
                                             }""",
                                         indexRequest.id(),
                                         indexName,
@@ -373,7 +376,9 @@ public class TransportSimulateBulkActionTests extends ESTestCase {
         };
         when(indicesService.withTempIndexService(any(), any())).thenAnswer((Answer<?>) invocation -> {
             IndexMetadata imd = invocation.getArgument(0);
-            if (indicesWithInvalidMappings.contains(imd.getIndex().getName())) {
+            if (indicesWithInvalidMappings.contains(imd.getIndex().getName())
+                // We only want to throw exceptions inside TransportSimulateBulkAction:
+                && invocation.getArgument(1).getClass().getSimpleName().contains(TransportSimulateBulkAction.class.getSimpleName())) {
                 throw new ElasticsearchException("invalid mapping");
             } else {
                 // we don't actually care what is returned, as long as no exception is thrown the request is considered valid:

+ 51 - 5
server/src/test/java/org/elasticsearch/action/ingest/SimulateIndexResponseTests.java

@@ -13,6 +13,7 @@ import org.elasticsearch.ElasticsearchException;
 import org.elasticsearch.action.index.IndexResponse;
 import org.elasticsearch.common.Strings;
 import org.elasticsearch.common.bytes.BytesReference;
+import org.elasticsearch.common.compress.CompressedXContent;
 import org.elasticsearch.common.xcontent.XContentHelper;
 import org.elasticsearch.test.ESTestCase;
 import org.elasticsearch.test.RandomObjects;
@@ -25,6 +26,7 @@ import java.util.ArrayList;
 import java.util.List;
 import java.util.stream.Collectors;
 
+import static org.elasticsearch.cluster.metadata.ComponentTemplateTests.randomMappings;
 import static org.hamcrest.Matchers.equalTo;
 
 public class SimulateIndexResponseTests extends ESTestCase {
@@ -49,6 +51,7 @@ public class SimulateIndexResponseTests extends ESTestCase {
             XContentType.JSON,
             pipelines,
             List.of(),
+            null,
             null
         );
 
@@ -61,7 +64,8 @@ public class SimulateIndexResponseTests extends ESTestCase {
                           "_index": "%s",
                           "_version": %d,
                           "_source": %s,
-                          "executed_pipelines": [%s]
+                          "executed_pipelines": [%s],
+                          "effective_mapping": {}
                         }""",
                     id,
                     index,
@@ -81,7 +85,8 @@ public class SimulateIndexResponseTests extends ESTestCase {
             XContentType.JSON,
             pipelines,
             List.of(),
-            new ElasticsearchException("Some failure")
+            new ElasticsearchException("Some failure"),
+            null
         );
 
         assertEquals(
@@ -94,7 +99,8 @@ public class SimulateIndexResponseTests extends ESTestCase {
                           "_version": %d,
                           "_source": %s,
                           "executed_pipelines": [%s],
-                          "error":{"type":"exception","reason":"Some failure"}
+                          "error":{"type":"exception","reason":"Some failure"},
+                          "effective_mapping": {}
                         }""",
                     id,
                     index,
@@ -114,6 +120,7 @@ public class SimulateIndexResponseTests extends ESTestCase {
             XContentType.JSON,
             pipelines,
             List.of("abc", "def"),
+            null,
             null
         );
 
@@ -127,7 +134,8 @@ public class SimulateIndexResponseTests extends ESTestCase {
                           "_version": %d,
                           "_source": %s,
                           "executed_pipelines": [%s],
-                          "ignored_fields": [{"field": "abc"}, {"field": "def"}]
+                          "ignored_fields": [{"field": "abc"}, {"field": "def"}],
+                          "effective_mapping": {}
                         }""",
                     id,
                     index,
@@ -138,6 +146,39 @@ public class SimulateIndexResponseTests extends ESTestCase {
             ),
             Strings.toString(indexResponseWithIgnoredFields)
         );
+
+        SimulateIndexResponse responseWithEffectiveMapping = new SimulateIndexResponse(
+            id,
+            index,
+            version,
+            sourceBytes,
+            XContentType.JSON,
+            pipelines,
+            List.of(),
+            null,
+            new CompressedXContent("{\"properties\":{\"foo\":{\"type\":\"keyword\"}}}")
+        );
+        assertEquals(
+            XContentHelper.stripWhitespace(
+                Strings.format(
+                    """
+                        {
+                          "_id": "%s",
+                          "_index": "%s",
+                          "_version": %d,
+                          "_source": %s,
+                          "executed_pipelines": [%s],
+                          "effective_mapping": {"properties": {"foo": {"type": "keyword"}}}
+                        }""",
+                    id,
+                    index,
+                    version,
+                    source,
+                    pipelines.stream().map(pipeline -> "\"" + pipeline + "\"").collect(Collectors.joining(","))
+                )
+            ),
+            Strings.toString(responseWithEffectiveMapping)
+        );
     }
 
     public void testSerialization() throws IOException {
@@ -171,7 +212,12 @@ public class SimulateIndexResponseTests extends ESTestCase {
             xContentType,
             pipelines,
             randomList(0, 20, () -> randomAlphaOfLength(15)),
-            randomBoolean() ? null : new ElasticsearchException("failed")
+            randomBoolean() ? null : new ElasticsearchException("failed"),
+            randomEffectiveMapping()
         );
     }
+
+    private static CompressedXContent randomEffectiveMapping() {
+        return randomBoolean() ? null : randomMappings();
+    }
 }

+ 17 - 6
server/src/test/java/org/elasticsearch/rest/action/ingest/RestSimulateIngestActionTests.java

@@ -15,6 +15,7 @@ import org.elasticsearch.action.bulk.BulkItemResponse;
 import org.elasticsearch.action.bulk.BulkResponse;
 import org.elasticsearch.action.ingest.SimulateIndexResponse;
 import org.elasticsearch.common.bytes.BytesReference;
+import org.elasticsearch.common.compress.CompressedXContent;
 import org.elasticsearch.common.xcontent.XContentHelper;
 import org.elasticsearch.rest.AbstractRestChannel;
 import org.elasticsearch.rest.RestResponse;
@@ -23,6 +24,7 @@ import org.elasticsearch.test.rest.FakeRestRequest;
 import org.elasticsearch.xcontent.NamedXContentRegistry;
 import org.elasticsearch.xcontent.XContentType;
 
+import java.io.IOException;
 import java.nio.ByteBuffer;
 import java.nio.charset.StandardCharsets;
 import java.util.List;
@@ -157,9 +159,9 @@ public class RestSimulateIngestActionTests extends ESTestCase {
     public void testSimulateIngestRestToXContentListener() throws Exception {
         // First, make sure it works with success responses:
         BulkItemResponse[] responses = new BulkItemResponse[3];
-        responses[0] = getSuccessBulkItemResponse("123", "{\"foo\": \"bar\"}");
+        responses[0] = getSuccessBulkItemResponse("123", "{\"foo\": \"bar\"}", false);
         responses[1] = getFailureBulkItemResponse("678", "This has failed");
-        responses[2] = getSuccessBulkItemResponse("456", "{\"bar\": \"baz\"}");
+        responses[2] = getSuccessBulkItemResponse("456", "{\"bar\": \"baz\"}", true);
         BulkResponse bulkResponse = new BulkResponse(responses, randomLongBetween(0, 50000));
         String expectedXContent = """
             {
@@ -183,7 +185,8 @@ public class RestSimulateIngestActionTests extends ESTestCase {
                       {
                         "field" : "def"
                       }
-                    ]
+                    ],
+                    "effective_mapping" : { }
                   }
                 },
                 {
@@ -215,7 +218,14 @@ public class RestSimulateIngestActionTests extends ESTestCase {
                       {
                         "field" : "def"
                       }
-                    ]
+                    ],
+                    "effective_mapping" : {
+                      "properties" : {
+                        "foo" : {
+                          "type" : "keyword"
+                        }
+                      }
+                    }
                   }
                 }
               ]
@@ -231,7 +241,7 @@ public class RestSimulateIngestActionTests extends ESTestCase {
         );
     }
 
-    private BulkItemResponse getSuccessBulkItemResponse(String id, String source) {
+    private BulkItemResponse getSuccessBulkItemResponse(String id, String source, boolean hasMapping) throws IOException {
         ByteBuffer[] sourceByteBuffer = new ByteBuffer[1];
         sourceByteBuffer[0] = ByteBuffer.wrap(source.getBytes(StandardCharsets.UTF_8));
         return BulkItemResponse.success(
@@ -245,7 +255,8 @@ public class RestSimulateIngestActionTests extends ESTestCase {
                 XContentType.JSON,
                 List.of("pipeline1", "pipeline2"),
                 List.of("abc", "def"),
-                null
+                null,
+                hasMapping ? new CompressedXContent("{\"properties\":{\"foo\":{\"type\":\"keyword\"}}}") : null
             )
         );
     }