|
@@ -812,6 +812,45 @@ public class IngestServiceTests extends ESTestCase {
|
|
|
assertThat(ingestService.stats().getStatsPerPipeline(), not(hasKey("_id2")));
|
|
|
}
|
|
|
|
|
|
+ public void testExecuteWithDrop() {
|
|
|
+ Map<String, Processor.Factory> factories = new HashMap<>();
|
|
|
+ factories.put("drop", new DropProcessor.Factory());
|
|
|
+ factories.put("mock", (processorFactories, tag, config) -> new Processor() {
|
|
|
+ @Override
|
|
|
+ public IngestDocument execute(final IngestDocument ingestDocument) {
|
|
|
+ throw new AssertionError("Document should have been dropped but reached this processor");
|
|
|
+ }
|
|
|
+
|
|
|
+ @Override
|
|
|
+ public String getType() {
|
|
|
+ return null;
|
|
|
+ }
|
|
|
+
|
|
|
+ @Override
|
|
|
+ public String getTag() {
|
|
|
+ return null;
|
|
|
+ }
|
|
|
+ });
|
|
|
+ IngestService ingestService = createWithProcessors(factories);
|
|
|
+ PutPipelineRequest putRequest = new PutPipelineRequest("_id",
|
|
|
+ new BytesArray("{\"processors\": [{\"drop\" : {}}, {\"mock\" : {}}]}"), XContentType.JSON);
|
|
|
+ ClusterState clusterState = ClusterState.builder(new ClusterName("_name")).build(); // Start empty
|
|
|
+ ClusterState previousClusterState = clusterState;
|
|
|
+ clusterState = IngestService.innerPut(putRequest, clusterState);
|
|
|
+ ingestService.applyClusterState(new ClusterChangedEvent("", clusterState, previousClusterState));
|
|
|
+ final IndexRequest indexRequest = new IndexRequest("_index", "_type", "_id").source(emptyMap()).setPipeline("_id");
|
|
|
+ @SuppressWarnings("unchecked")
|
|
|
+ final BiConsumer<IndexRequest, Exception> failureHandler = mock(BiConsumer.class);
|
|
|
+ @SuppressWarnings("unchecked")
|
|
|
+ final Consumer<Exception> completionHandler = mock(Consumer.class);
|
|
|
+ @SuppressWarnings("unchecked")
|
|
|
+ final Consumer<IndexRequest> dropHandler = mock(Consumer.class);
|
|
|
+ ingestService.executeBulkRequest(Collections.singletonList(indexRequest), failureHandler, completionHandler, dropHandler);
|
|
|
+ verify(failureHandler, never()).accept(any(), any());
|
|
|
+ verify(completionHandler, times(1)).accept(null);
|
|
|
+ verify(dropHandler, times(1)).accept(indexRequest);
|
|
|
+ }
|
|
|
+
|
|
|
private IngestDocument eqIndexTypeId(final Map<String, Object> source) {
|
|
|
return argThat(new IngestDocumentMatcher("_index", "_type", "_id", source));
|
|
|
}
|