Browse Source

Fix IngestService to respect original document content type (#45799)

This PR modifies the logic in IngestService to preserve the original content type 
on the IndexRequest, such that when a document with a content type like SMILE 
is submitted to a pipeline, the resulting document that is persisted will remain in 
the original content type (SMILE in this case).
James Baiera 6 năm trước cách đây
mục cha
commit
5807ba0bfa

+ 1 - 1
server/src/main/java/org/elasticsearch/ingest/IngestService.java

@@ -438,7 +438,7 @@ public class IngestService implements ClusterStateApplier {
                 if (metadataMap.get(IngestDocument.MetaData.VERSION_TYPE) != null) {
                     indexRequest.versionType(VersionType.fromString((String) metadataMap.get(IngestDocument.MetaData.VERSION_TYPE)));
                 }
-                indexRequest.source(ingestDocument.getSourceAndMetadata());
+                indexRequest.source(ingestDocument.getSourceAndMetadata(), indexRequest.getContentType());
             }
         } catch (Exception e) {
             totalMetrics.ingestFailed();

+ 25 - 5
server/src/test/java/org/elasticsearch/ingest/IngestServiceTests.java

@@ -29,6 +29,7 @@ import org.elasticsearch.ResourceNotFoundException;
 import org.elasticsearch.Version;
 import org.elasticsearch.action.DocWriteRequest;
 import org.elasticsearch.action.bulk.BulkRequest;
+import org.elasticsearch.action.bulk.TransportBulkAction;
 import org.elasticsearch.action.delete.DeleteRequest;
 import org.elasticsearch.action.index.IndexRequest;
 import org.elasticsearch.action.ingest.DeletePipelineRequest;
@@ -71,6 +72,7 @@ import java.util.concurrent.ExecutorService;
 import java.util.function.BiConsumer;
 import java.util.function.Consumer;
 import java.util.function.LongSupplier;
+import java.util.stream.Collectors;
 
 import static java.util.Collections.emptyMap;
 import static java.util.Collections.emptySet;
@@ -890,20 +892,33 @@ public class IngestServiceTests extends ESTestCase {
         verify(completionHandler, times(1)).accept(null);
     }
 
-    public void testBulkRequestExecution() {
+    public void testBulkRequestExecution() throws Exception {
         BulkRequest bulkRequest = new BulkRequest();
         String pipelineId = "_id";
 
+        // Test to make sure that ingest respects content types other than the default index content type
+        XContentType xContentType = randomFrom(Arrays.stream(XContentType.values())
+                .filter(t -> Requests.INDEX_CONTENT_TYPE.equals(t) == false)
+                .collect(Collectors.toList()));
+
+        logger.info("Using [{}], not randomly determined default [{}]", xContentType, Requests.INDEX_CONTENT_TYPE);
         int numRequest = scaledRandomIntBetween(8, 64);
         for (int i = 0; i < numRequest; i++) {
             IndexRequest indexRequest = new IndexRequest("_index", "_type", "_id").setPipeline(pipelineId);
-            indexRequest.source(Requests.INDEX_CONTENT_TYPE, "field1", "value1");
+            indexRequest.source(xContentType, "field1", "value1");
             bulkRequest.add(indexRequest);
         }
 
-        IngestService ingestService = createWithProcessors(emptyMap());
-        PutPipelineRequest putRequest =
-            new PutPipelineRequest("_id", new BytesArray("{\"processors\": [], \"description\": \"_description\"}"), XContentType.JSON);
+        final Processor processor = mock(Processor.class);
+        when(processor.getType()).thenReturn("mock");
+        when(processor.getTag()).thenReturn("mockTag");
+        when(processor.execute(any(IngestDocument.class))).thenReturn( RandomDocumentPicks.randomIngestDocument(random()));
+        Map<String, Processor.Factory> map = new HashMap<>(2);
+        map.put("mock", (factories, tag, config) -> processor);
+
+        IngestService ingestService = createWithProcessors(map);
+        PutPipelineRequest putRequest = new PutPipelineRequest("_id",
+            new BytesArray("{\"processors\": [{\"mock\": {}}], \"description\": \"_description\"}"), XContentType.JSON);
         ClusterState clusterState = ClusterState.builder(new ClusterName("_name")).build();
         ClusterState previousClusterState = clusterState;
         clusterState = IngestService.innerPut(putRequest, clusterState);
@@ -917,6 +932,11 @@ public class IngestServiceTests extends ESTestCase {
 
         verify(requestItemErrorHandler, never()).accept(any(), any());
         verify(completionHandler, times(1)).accept(null);
+        for (DocWriteRequest<?> docWriteRequest : bulkRequest.requests()) {
+            IndexRequest indexRequest = TransportBulkAction.getIndexWriteRequest(docWriteRequest);
+            assertThat(indexRequest, notNullValue());
+            assertThat(indexRequest.getContentType(), equalTo(xContentType));
+        }
     }
 
     public void testStats() throws Exception {