瀏覽代碼

Remove PipelineExecutionService#executeIndexRequest (#29537)

With the move long ago to execute all single-document indexing requests
as bulk indexing request, the method
PipelineExecutionService#executeIndexRequest is unused and will never be
used in production code. This commit removes this method and cuts over
all tests to use PipelineExecutionService#executeBulkRequest.
Jason Tedor 7 年之前
父節點
當前提交
a8d4ee1620

+ 0 - 17
server/src/main/java/org/elasticsearch/ingest/PipelineExecutionService.java

@@ -53,23 +53,6 @@ public class PipelineExecutionService implements ClusterStateApplier {
         this.threadPool = threadPool;
     }
 
-    public void executeIndexRequest(IndexRequest request, Consumer<Exception> failureHandler, Consumer<Boolean> completionHandler) {
-        Pipeline pipeline = getPipeline(request.getPipeline());
-        threadPool.executor(ThreadPool.Names.INDEX).execute(new AbstractRunnable() {
-
-            @Override
-            public void onFailure(Exception e) {
-                failureHandler.accept(e);
-            }
-
-            @Override
-            protected void doRun() throws Exception {
-                innerExecute(request, pipeline);
-                completionHandler.accept(true);
-            }
-        });
-    }
-
     public void executeBulkRequest(Iterable<DocWriteRequest> actionRequests,
                                    BiConsumer<IndexRequest, Exception> itemFailureHandler,
                                    Consumer<Exception> completionHandler) {

+ 151 - 135
server/src/test/java/org/elasticsearch/ingest/PipelineExecutionServiceTests.java

@@ -40,6 +40,7 @@ import org.mockito.invocation.InvocationOnMock;
 
 import java.util.Collections;
 import java.util.HashMap;
+import java.util.List;
 import java.util.Map;
 import java.util.Objects;
 import java.util.concurrent.ExecutorService;
@@ -48,9 +49,10 @@ import java.util.function.Consumer;
 
 import static org.hamcrest.Matchers.equalTo;
 import static org.hamcrest.Matchers.hasKey;
+import static org.hamcrest.Matchers.instanceOf;
 import static org.hamcrest.Matchers.not;
+import static org.hamcrest.Matchers.sameInstance;
 import static org.mockito.Matchers.any;
-import static org.mockito.Matchers.anyBoolean;
 import static org.mockito.Matchers.anyString;
 import static org.mockito.Matchers.argThat;
 import static org.mockito.Matchers.eq;
@@ -78,19 +80,23 @@ public class PipelineExecutionServiceTests extends ESTestCase {
     }
 
     public void testExecuteIndexPipelineDoesNotExist() {
-        IndexRequest indexRequest = new IndexRequest("_index", "_type", "_id").source(Collections.emptyMap()).setPipeline("_id");
-        @SuppressWarnings("unchecked")
-        Consumer<Exception> failureHandler = mock(Consumer.class);
-        @SuppressWarnings("unchecked")
-        Consumer<Boolean> completionHandler = mock(Consumer.class);
-        try {
-            executionService.executeIndexRequest(indexRequest, failureHandler, completionHandler);
-            fail("IllegalArgumentException expected");
-        } catch (IllegalArgumentException e) {
+        final IndexRequest indexRequest = new IndexRequest("_index", "_type", "_id").source(Collections.emptyMap()).setPipeline("_id");
+
+        final SetOnce<Boolean> failure = new SetOnce<>();
+        final BiConsumer<IndexRequest, Exception> failureHandler = (request, e) -> {
+            failure.set(true);
+            assertThat(request, sameInstance(indexRequest));
+            assertThat(e, instanceOf(IllegalArgumentException.class));
             assertThat(e.getMessage(), equalTo("pipeline with id [_id] does not exist"));
-        }
-        verify(failureHandler, never()).accept(any(Exception.class));
-        verify(completionHandler, never()).accept(anyBoolean());
+        };
+
+        @SuppressWarnings("unchecked")
+        final Consumer<Exception> completionHandler = mock(Consumer.class);
+
+        executionService.executeBulkRequest(Collections.singletonList(indexRequest), failureHandler, completionHandler);
+
+        assertTrue(failure.get());
+        verify(completionHandler, times(1)).accept(null);
     }
 
     public void testExecuteIndexPipelineExistsButFailedParsing() {
@@ -106,17 +112,23 @@ public class PipelineExecutionServiceTests extends ESTestCase {
                 return null;
             }
         })));
-        SetOnce<Boolean> failed = new SetOnce<>();
-        IndexRequest indexRequest = new IndexRequest("_index", "_type", "_id").source(Collections.emptyMap()).setPipeline("_id");
-        Consumer<Exception> failureHandler = (e) -> {
-            assertThat(e.getCause().getClass(), equalTo(IllegalArgumentException.class));
-            assertThat(e.getCause().getCause().getClass(), equalTo(IllegalStateException.class));
+
+        final SetOnce<Boolean> failure = new SetOnce<>();
+        final IndexRequest indexRequest = new IndexRequest("_index", "_type", "_id").source(Collections.emptyMap()).setPipeline("_id");
+        final BiConsumer<IndexRequest, Exception> failureHandler = (request, e) -> {
+            assertThat(e.getCause(), instanceOf(IllegalArgumentException.class));
+            assertThat(e.getCause().getCause(), instanceOf(IllegalStateException.class));
             assertThat(e.getCause().getCause().getMessage(), equalTo("error"));
-            failed.set(true);
+            failure.set(true);
         };
-        Consumer<Boolean> completionHandler = (e) -> failed.set(false);
-        executionService.executeIndexRequest(indexRequest, failureHandler, completionHandler);
-        assertTrue(failed.get());
+
+        @SuppressWarnings("unchecked")
+        final Consumer<Exception> completionHandler = mock(Consumer.class);
+
+        executionService.executeBulkRequest(Collections.singletonList(indexRequest), failureHandler, completionHandler);
+
+        assertTrue(failure.get());
+        verify(completionHandler, times(1)).accept(null);
     }
 
     public void testExecuteBulkPipelineDoesNotExist() {
@@ -152,41 +164,40 @@ public class PipelineExecutionServiceTests extends ESTestCase {
         verify(completionHandler, times(1)).accept(null);
     }
 
-    public void testExecuteSuccess() throws Exception {
-        CompoundProcessor processor = mock(CompoundProcessor.class);
+    public void testExecuteSuccess() {
+        final CompoundProcessor processor = mock(CompoundProcessor.class);
         when(store.get("_id")).thenReturn(new Pipeline("_id", "_description", version, processor));
-
-        IndexRequest indexRequest = new IndexRequest("_index", "_type", "_id").source(Collections.emptyMap()).setPipeline("_id");
+        final IndexRequest indexRequest = new IndexRequest("_index", "_type", "_id").source(Collections.emptyMap()).setPipeline("_id");
         @SuppressWarnings("unchecked")
-        Consumer<Exception> failureHandler = mock(Consumer.class);
+        final BiConsumer<IndexRequest, Exception> failureHandler = mock(BiConsumer.class);
         @SuppressWarnings("unchecked")
-        Consumer<Boolean> completionHandler = mock(Consumer.class);
-        executionService.executeIndexRequest(indexRequest, failureHandler, completionHandler);
-        verify(failureHandler, never()).accept(any());
-        verify(completionHandler, times(1)).accept(true);
+        final Consumer<Exception> completionHandler = mock(Consumer.class);
+        executionService.executeBulkRequest(Collections.singletonList(indexRequest), failureHandler, completionHandler);
+        verify(failureHandler, never()).accept(any(), any());
+        verify(completionHandler, times(1)).accept(null);
     }
 
     public void testExecuteEmptyPipeline() throws Exception {
-        CompoundProcessor processor = mock(CompoundProcessor.class);
+        final CompoundProcessor processor = mock(CompoundProcessor.class);
         when(store.get("_id")).thenReturn(new Pipeline("_id", "_description", version, processor));
         when(processor.getProcessors()).thenReturn(Collections.emptyList());
 
-        IndexRequest indexRequest = new IndexRequest("_index", "_type", "_id").source(Collections.emptyMap()).setPipeline("_id");
+        final IndexRequest indexRequest = new IndexRequest("_index", "_type", "_id").source(Collections.emptyMap()).setPipeline("_id");
         @SuppressWarnings("unchecked")
-        Consumer<Exception> failureHandler = mock(Consumer.class);
+        final BiConsumer<IndexRequest, Exception> failureHandler = mock(BiConsumer.class);
         @SuppressWarnings("unchecked")
-        Consumer<Boolean> completionHandler = mock(Consumer.class);
-        executionService.executeIndexRequest(indexRequest, failureHandler, completionHandler);
+        final Consumer<Exception> completionHandler = mock(Consumer.class);
+        executionService.executeBulkRequest(Collections.singletonList(indexRequest), failureHandler, completionHandler);
         verify(processor, never()).execute(any());
-        verify(failureHandler, never()).accept(any());
-        verify(completionHandler, times(1)).accept(true);
+        verify(failureHandler, never()).accept(any(), any());
+        verify(completionHandler, times(1)).accept(null);
     }
 
     public void testExecutePropagateAllMetaDataUpdates() throws Exception {
-        CompoundProcessor processor = mock(CompoundProcessor.class);
+        final CompoundProcessor processor = mock(CompoundProcessor.class);
         when(processor.getProcessors()).thenReturn(Collections.singletonList(mock(Processor.class)));
-        long newVersion = randomLong();
-        String versionType = randomFrom("internal", "external", "external_gt", "external_gte");
+        final long newVersion = randomLong();
+        final String versionType = randomFrom("internal", "external", "external_gt", "external_gte");
         doAnswer((InvocationOnMock invocationOnMock) -> {
             IngestDocument ingestDocument = (IngestDocument) invocationOnMock.getArguments()[0];
             for (IngestDocument.MetaData metaData : IngestDocument.MetaData.values()) {
@@ -202,15 +213,15 @@ public class PipelineExecutionServiceTests extends ESTestCase {
         }).when(processor).execute(any());
         when(store.get("_id")).thenReturn(new Pipeline("_id", "_description", version, processor));
 
-        IndexRequest indexRequest = new IndexRequest("_index", "_type", "_id").source(Collections.emptyMap()).setPipeline("_id");
+        final IndexRequest indexRequest = new IndexRequest("_index", "_type", "_id").source(Collections.emptyMap()).setPipeline("_id");
         @SuppressWarnings("unchecked")
-        Consumer<Exception> failureHandler = mock(Consumer.class);
+        final BiConsumer<IndexRequest, Exception> failureHandler = mock(BiConsumer.class);
         @SuppressWarnings("unchecked")
-        Consumer<Boolean> completionHandler = mock(Consumer.class);
-        executionService.executeIndexRequest(indexRequest, failureHandler, completionHandler);
+        final Consumer<Exception> completionHandler = mock(Consumer.class);
+        executionService.executeBulkRequest(Collections.singletonList(indexRequest), failureHandler, completionHandler);
         verify(processor).execute(any());
-        verify(failureHandler, never()).accept(any());
-        verify(completionHandler, times(1)).accept(true);
+        verify(failureHandler, never()).accept(any(), any());
+        verify(completionHandler, times(1)).accept(null);
         assertThat(indexRequest.index(), equalTo("update_index"));
         assertThat(indexRequest.type(), equalTo("update_type"));
         assertThat(indexRequest.id(), equalTo("update_id"));
@@ -220,89 +231,94 @@ public class PipelineExecutionServiceTests extends ESTestCase {
     }
 
     public void testExecuteFailure() throws Exception {
-        CompoundProcessor processor = mock(CompoundProcessor.class);
+        final CompoundProcessor processor = mock(CompoundProcessor.class);
         when(processor.getProcessors()).thenReturn(Collections.singletonList(mock(Processor.class)));
         when(store.get("_id")).thenReturn(new Pipeline("_id", "_description", version, processor));
-        IndexRequest indexRequest = new IndexRequest("_index", "_type", "_id").source(Collections.emptyMap()).setPipeline("_id");
-        doThrow(new RuntimeException()).when(processor).execute(eqID("_index", "_type", "_id",
-            indexRequest.version(), indexRequest.versionType(), Collections.emptyMap()));
+        final IndexRequest indexRequest = new IndexRequest("_index", "_type", "_id").source(Collections.emptyMap()).setPipeline("_id");
+        doThrow(new RuntimeException())
+                .when(processor)
+                .execute(eqIndexTypeId(indexRequest.version(), indexRequest.versionType(), Collections.emptyMap()));
         @SuppressWarnings("unchecked")
-        Consumer<Exception> failureHandler = mock(Consumer.class);
+        final BiConsumer<IndexRequest, Exception> failureHandler = mock(BiConsumer.class);
         @SuppressWarnings("unchecked")
-        Consumer<Boolean> completionHandler = mock(Consumer.class);
-        executionService.executeIndexRequest(indexRequest, failureHandler, completionHandler);
-        verify(processor).execute(eqID("_index", "_type", "_id",
-            indexRequest.version(), indexRequest.versionType(), Collections.emptyMap()));
-        verify(failureHandler, times(1)).accept(any(RuntimeException.class));
-        verify(completionHandler, never()).accept(anyBoolean());
+        final Consumer<Exception> completionHandler = mock(Consumer.class);
+        executionService.executeBulkRequest(Collections.singletonList(indexRequest), failureHandler, completionHandler);
+        verify(processor).execute(eqIndexTypeId(indexRequest.version(), indexRequest.versionType(), Collections.emptyMap()));
+        verify(failureHandler, times(1)).accept(eq(indexRequest), any(RuntimeException.class));
+        verify(completionHandler, times(1)).accept(null);
     }
 
     public void testExecuteSuccessWithOnFailure() throws Exception {
-        Processor processor = mock(Processor.class);
+        final Processor processor = mock(Processor.class);
         when(processor.getType()).thenReturn("mock_processor_type");
         when(processor.getTag()).thenReturn("mock_processor_tag");
-        Processor onFailureProcessor = mock(Processor.class);
-        CompoundProcessor compoundProcessor = new CompoundProcessor(false, Collections.singletonList(processor),
-                Collections.singletonList(new CompoundProcessor(onFailureProcessor)));
+        final Processor onFailureProcessor = mock(Processor.class);
+        final CompoundProcessor compoundProcessor = new CompoundProcessor(
+                false, Collections.singletonList(processor), Collections.singletonList(new CompoundProcessor(onFailureProcessor)));
         when(store.get("_id")).thenReturn(new Pipeline("_id", "_description", version, compoundProcessor));
-        IndexRequest indexRequest = new IndexRequest("_index", "_type", "_id")
-            .source(Collections.emptyMap()).setPipeline("_id");
-        doThrow(new RuntimeException()).when(processor).execute(eqID("_index", "_type", "_id", Collections.emptyMap()));
+        final IndexRequest indexRequest = new IndexRequest("_index", "_type", "_id").source(Collections.emptyMap()).setPipeline("_id");
+        doThrow(new RuntimeException()).when(processor).execute(eqIndexTypeId(Collections.emptyMap()));
         @SuppressWarnings("unchecked")
-        Consumer<Exception> failureHandler = mock(Consumer.class);
+        final BiConsumer<IndexRequest, Exception> failureHandler = mock(BiConsumer.class);
         @SuppressWarnings("unchecked")
-        Consumer<Boolean> completionHandler = mock(Consumer.class);
-        executionService.executeIndexRequest(indexRequest, failureHandler, completionHandler);
-        verify(failureHandler, never()).accept(any(ElasticsearchException.class));
-        verify(completionHandler, times(1)).accept(true);
+        final Consumer<Exception> completionHandler = mock(Consumer.class);
+        executionService.executeBulkRequest(Collections.singletonList(indexRequest), failureHandler, completionHandler);
+        verify(failureHandler, never()).accept(eq(indexRequest), any(ElasticsearchException.class));
+        verify(completionHandler, times(1)).accept(null);
     }
 
     public void testExecuteFailureWithOnFailure() throws Exception {
-        Processor processor = mock(Processor.class);
-        Processor onFailureProcessor = mock(Processor.class);
-        CompoundProcessor compoundProcessor = new CompoundProcessor(false, Collections.singletonList(processor),
-                Collections.singletonList(new CompoundProcessor(onFailureProcessor)));
+        final Processor processor = mock(Processor.class);
+        final Processor onFailureProcessor = mock(Processor.class);
+        final CompoundProcessor compoundProcessor = new CompoundProcessor(
+                false, Collections.singletonList(processor), Collections.singletonList(new CompoundProcessor(onFailureProcessor)));
         when(store.get("_id")).thenReturn(new Pipeline("_id", "_description", version, compoundProcessor));
-        IndexRequest indexRequest = new IndexRequest("_index", "_type", "_id").source(Collections.emptyMap()).setPipeline("_id");
-        doThrow(new RuntimeException()).when(processor).execute(eqID("_index", "_type", "_id",
-            indexRequest.version(), indexRequest.versionType(), Collections.emptyMap()));
-        doThrow(new RuntimeException()).when(onFailureProcessor).execute(eqID("_index", "_type", "_id", indexRequest.version(),
-            indexRequest.versionType(), Collections.emptyMap()));
+        final IndexRequest indexRequest = new IndexRequest("_index", "_type", "_id").source(Collections.emptyMap()).setPipeline("_id");
+        doThrow(new RuntimeException())
+                .when(processor)
+                .execute(eqIndexTypeId(indexRequest.version(), indexRequest.versionType(), Collections.emptyMap()));
+        doThrow(new RuntimeException())
+                .when(onFailureProcessor)
+                .execute(eqIndexTypeId(indexRequest.version(), indexRequest.versionType(), Collections.emptyMap()));
         @SuppressWarnings("unchecked")
-        Consumer<Exception> failureHandler = mock(Consumer.class);
+        final BiConsumer<IndexRequest, Exception> failureHandler = mock(BiConsumer.class);
         @SuppressWarnings("unchecked")
-        Consumer<Boolean> completionHandler = mock(Consumer.class);
-        executionService.executeIndexRequest(indexRequest, failureHandler, completionHandler);
-        verify(processor).execute(eqID("_index", "_type", "_id",
-            indexRequest.version(), indexRequest.versionType(), Collections.emptyMap()));
-        verify(failureHandler, times(1)).accept(any(RuntimeException.class));
-        verify(completionHandler, never()).accept(anyBoolean());
+        final Consumer<Exception> completionHandler = mock(Consumer.class);
+        executionService.executeBulkRequest(Collections.singletonList(indexRequest), failureHandler, completionHandler);
+        verify(processor).execute(eqIndexTypeId(indexRequest.version(), indexRequest.versionType(), Collections.emptyMap()));
+        verify(failureHandler, times(1)).accept(eq(indexRequest), any(RuntimeException.class));
+        verify(completionHandler, times(1)).accept(null);
     }
 
     public void testExecuteFailureWithNestedOnFailure() throws Exception {
-        Processor processor = mock(Processor.class);
-        Processor onFailureProcessor = mock(Processor.class);
-        Processor onFailureOnFailureProcessor = mock(Processor.class);
-        CompoundProcessor compoundProcessor = new CompoundProcessor(false, Collections.singletonList(processor),
-            Collections.singletonList(new CompoundProcessor(false, Collections.singletonList(onFailureProcessor),
-                    Collections.singletonList(onFailureOnFailureProcessor))));
+        final Processor processor = mock(Processor.class);
+        final Processor onFailureProcessor = mock(Processor.class);
+        final Processor onFailureOnFailureProcessor = mock(Processor.class);
+        final List<Processor> processors = Collections.singletonList(onFailureProcessor);
+        final List<Processor> onFailureProcessors = Collections.singletonList(onFailureOnFailureProcessor);
+        final CompoundProcessor compoundProcessor = new CompoundProcessor(
+                false,
+                Collections.singletonList(processor),
+                Collections.singletonList(new CompoundProcessor(false, processors, onFailureProcessors)));
         when(store.get("_id")).thenReturn(new Pipeline("_id", "_description", version, compoundProcessor));
-        IndexRequest indexRequest = new IndexRequest("_index", "_type", "_id").source(Collections.emptyMap()).setPipeline("_id");
-        doThrow(new RuntimeException()).when(onFailureOnFailureProcessor).execute(eqID("_index", "_type", "_id",
-            indexRequest.version(), indexRequest.versionType(), Collections.emptyMap()));
-        doThrow(new RuntimeException()).when(onFailureProcessor).execute(eqID("_index", "_type", "_id",
-            indexRequest.version(), indexRequest.versionType(), Collections.emptyMap()));
-        doThrow(new RuntimeException()).when(processor).execute(eqID("_index", "_type", "_id",
-            indexRequest.version(), indexRequest.versionType(), Collections.emptyMap()));
+        final IndexRequest indexRequest = new IndexRequest("_index", "_type", "_id").source(Collections.emptyMap()).setPipeline("_id");
+        doThrow(new RuntimeException())
+                .when(onFailureOnFailureProcessor)
+                .execute(eqIndexTypeId(indexRequest.version(), indexRequest.versionType(), Collections.emptyMap()));
+        doThrow(new RuntimeException())
+                .when(onFailureProcessor)
+                .execute(eqIndexTypeId(indexRequest.version(), indexRequest.versionType(), Collections.emptyMap()));
+        doThrow(new RuntimeException())
+                .when(processor)
+                .execute(eqIndexTypeId(indexRequest.version(), indexRequest.versionType(), Collections.emptyMap()));
         @SuppressWarnings("unchecked")
-        Consumer<Exception> failureHandler = mock(Consumer.class);
+        final BiConsumer<IndexRequest, Exception> failureHandler = mock(BiConsumer.class);
         @SuppressWarnings("unchecked")
-        Consumer<Boolean> completionHandler = mock(Consumer.class);
-        executionService.executeIndexRequest(indexRequest, failureHandler, completionHandler);
-        verify(processor).execute(eqID("_index", "_type", "_id",
-            indexRequest.version(), indexRequest.versionType(), Collections.emptyMap()));
-        verify(failureHandler, times(1)).accept(any(RuntimeException.class));
-        verify(completionHandler, never()).accept(anyBoolean());
+        final Consumer<Exception> completionHandler = mock(Consumer.class);
+        executionService.executeBulkRequest(Collections.singletonList(indexRequest), failureHandler, completionHandler);
+        verify(processor).execute(eqIndexTypeId(indexRequest.version(), indexRequest.versionType(), Collections.emptyMap()));
+        verify(failureHandler, times(1)).accept(eq(indexRequest), any(RuntimeException.class));
+        verify(completionHandler, times(1)).accept(null);
     }
 
     public void testBulkRequestExecutionWithFailures() throws Exception {
@@ -344,7 +360,7 @@ public class PipelineExecutionServiceTests extends ESTestCase {
         verify(completionHandler, times(1)).accept(null);
     }
 
-    public void testBulkRequestExecution() throws Exception {
+    public void testBulkRequestExecution() {
         BulkRequest bulkRequest = new BulkRequest();
         String pipelineId = "_id";
 
@@ -367,47 +383,47 @@ public class PipelineExecutionServiceTests extends ESTestCase {
         verify(completionHandler, times(1)).accept(null);
     }
 
-    public void testStats() throws Exception {
-        IngestStats ingestStats = executionService.stats();
-        assertThat(ingestStats.getStatsPerPipeline().size(), equalTo(0));
-        assertThat(ingestStats.getTotalStats().getIngestCount(), equalTo(0L));
-        assertThat(ingestStats.getTotalStats().getIngestCurrent(), equalTo(0L));
-        assertThat(ingestStats.getTotalStats().getIngestFailedCount(), equalTo(0L));
-        assertThat(ingestStats.getTotalStats().getIngestTimeInMillis(), equalTo(0L));
+    public void testStats() {
+        final IngestStats initialStats = executionService.stats();
+        assertThat(initialStats.getStatsPerPipeline().size(), equalTo(0));
+        assertThat(initialStats.getTotalStats().getIngestCount(), equalTo(0L));
+        assertThat(initialStats.getTotalStats().getIngestCurrent(), equalTo(0L));
+        assertThat(initialStats.getTotalStats().getIngestFailedCount(), equalTo(0L));
+        assertThat(initialStats.getTotalStats().getIngestTimeInMillis(), equalTo(0L));
 
         when(store.get("_id1")).thenReturn(new Pipeline("_id1", null, version, new CompoundProcessor(mock(Processor.class))));
         when(store.get("_id2")).thenReturn(new Pipeline("_id2", null, null, new CompoundProcessor(mock(Processor.class))));
 
-        Map<String, PipelineConfiguration> configurationMap = new HashMap<>();
+        final Map<String, PipelineConfiguration> configurationMap = new HashMap<>();
         configurationMap.put("_id1", new PipelineConfiguration("_id1", new BytesArray("{}"), XContentType.JSON));
         configurationMap.put("_id2", new PipelineConfiguration("_id2", new BytesArray("{}"), XContentType.JSON));
         executionService.updatePipelineStats(new IngestMetadata(configurationMap));
 
         @SuppressWarnings("unchecked")
-        Consumer<Exception> failureHandler = mock(Consumer.class);
+        final BiConsumer<IndexRequest, Exception> failureHandler = mock(BiConsumer.class);
         @SuppressWarnings("unchecked")
-        Consumer<Boolean> completionHandler = mock(Consumer.class);
+        final Consumer<Exception> completionHandler = mock(Consumer.class);
 
-        IndexRequest indexRequest = new IndexRequest("_index");
+        final IndexRequest indexRequest = new IndexRequest("_index");
         indexRequest.setPipeline("_id1");
-        executionService.executeIndexRequest(indexRequest, failureHandler, completionHandler);
-        ingestStats = executionService.stats();
-        assertThat(ingestStats.getStatsPerPipeline().size(), equalTo(2));
-        assertThat(ingestStats.getStatsPerPipeline().get("_id1").getIngestCount(), equalTo(1L));
-        assertThat(ingestStats.getStatsPerPipeline().get("_id2").getIngestCount(), equalTo(0L));
-        assertThat(ingestStats.getTotalStats().getIngestCount(), equalTo(1L));
+        executionService.executeBulkRequest(Collections.singletonList(indexRequest), failureHandler, completionHandler);
+        final IngestStats afterFirstRequestStats = executionService.stats();
+        assertThat(afterFirstRequestStats.getStatsPerPipeline().size(), equalTo(2));
+        assertThat(afterFirstRequestStats.getStatsPerPipeline().get("_id1").getIngestCount(), equalTo(1L));
+        assertThat(afterFirstRequestStats.getStatsPerPipeline().get("_id2").getIngestCount(), equalTo(0L));
+        assertThat(afterFirstRequestStats.getTotalStats().getIngestCount(), equalTo(1L));
 
         indexRequest.setPipeline("_id2");
-        executionService.executeIndexRequest(indexRequest, failureHandler, completionHandler);
-        ingestStats = executionService.stats();
-        assertThat(ingestStats.getStatsPerPipeline().size(), equalTo(2));
-        assertThat(ingestStats.getStatsPerPipeline().get("_id1").getIngestCount(), equalTo(1L));
-        assertThat(ingestStats.getStatsPerPipeline().get("_id2").getIngestCount(), equalTo(1L));
-        assertThat(ingestStats.getTotalStats().getIngestCount(), equalTo(2L));
+        executionService.executeBulkRequest(Collections.singletonList(indexRequest), failureHandler, completionHandler);
+        final IngestStats afterSecondRequestStats = executionService.stats();
+        assertThat(afterSecondRequestStats.getStatsPerPipeline().size(), equalTo(2));
+        assertThat(afterSecondRequestStats.getStatsPerPipeline().get("_id1").getIngestCount(), equalTo(1L));
+        assertThat(afterSecondRequestStats.getStatsPerPipeline().get("_id2").getIngestCount(), equalTo(1L));
+        assertThat(afterSecondRequestStats.getTotalStats().getIngestCount(), equalTo(2L));
     }
 
     // issue: https://github.com/elastic/elasticsearch/issues/18126
-    public void testUpdatingStatsWhenRemovingPipelineWorks() throws Exception {
+    public void testUpdatingStatsWhenRemovingPipelineWorks() {
         Map<String, PipelineConfiguration> configurationMap = new HashMap<>();
         configurationMap.put("_id1", new PipelineConfiguration("_id1", new BytesArray("{}"), XContentType.JSON));
         configurationMap.put("_id2", new PipelineConfiguration("_id2", new BytesArray("{}"), XContentType.JSON));
@@ -422,12 +438,12 @@ public class PipelineExecutionServiceTests extends ESTestCase {
         assertThat(executionService.stats().getStatsPerPipeline(), not(hasKey("_id2")));
     }
 
-    private IngestDocument eqID(String index, String type, String id, Map<String, Object> source) {
-        return argThat(new IngestDocumentMatcher(index, type, id, source));
+    private IngestDocument eqIndexTypeId(final Map<String, Object> source) {
+        return argThat(new IngestDocumentMatcher("_index", "_type", "_id", source));
     }
 
-    private IngestDocument eqID(String index, String type, String id, Long version, VersionType versionType, Map<String, Object> source) {
-        return argThat(new IngestDocumentMatcher(index, type, id, version, versionType, source));
+    private IngestDocument eqIndexTypeId(final Long version, final VersionType versionType, final Map<String, Object> source) {
+        return argThat(new IngestDocumentMatcher("_index", "_type", "_id", version, versionType, source));
     }
 
     private class IngestDocumentMatcher extends ArgumentMatcher<IngestDocument> {