|
@@ -73,6 +73,7 @@ import java.util.concurrent.ExecutorService;
|
|
|
import java.util.concurrent.atomic.AtomicInteger;
|
|
|
import java.util.function.BiConsumer;
|
|
|
import java.util.function.Consumer;
|
|
|
+import java.util.function.IntConsumer;
|
|
|
import java.util.function.LongSupplier;
|
|
|
import java.util.stream.Collectors;
|
|
|
|
|
@@ -86,6 +87,7 @@ import static org.hamcrest.Matchers.notNullValue;
|
|
|
import static org.hamcrest.Matchers.nullValue;
|
|
|
import static org.hamcrest.Matchers.sameInstance;
|
|
|
import static org.mockito.Matchers.any;
|
|
|
+import static org.mockito.Matchers.anyInt;
|
|
|
import static org.mockito.Matchers.anyString;
|
|
|
import static org.mockito.Matchers.argThat;
|
|
|
import static org.mockito.Matchers.eq;
|
|
@@ -135,20 +137,20 @@ public class IngestServiceTests extends ESTestCase {
|
|
|
final IndexRequest indexRequest = new IndexRequest("_index", "_type", "_id").source(emptyMap()).setPipeline("_id");
|
|
|
|
|
|
final SetOnce<Boolean> failure = new SetOnce<>();
|
|
|
- final BiConsumer<IndexRequest, Exception> failureHandler = (request, e) -> {
|
|
|
+ final BiConsumer<Integer, Exception> failureHandler = (slot, e) -> {
|
|
|
failure.set(true);
|
|
|
- assertThat(request, sameInstance(indexRequest));
|
|
|
+ assertThat(slot, equalTo(0));
|
|
|
assertThat(e, instanceOf(IllegalArgumentException.class));
|
|
|
assertThat(e.getMessage(), equalTo("pipeline with id [_id] does not exist"));
|
|
|
};
|
|
|
|
|
|
@SuppressWarnings("unchecked")
|
|
|
- final Consumer<Exception> completionHandler = mock(Consumer.class);
|
|
|
+ final BiConsumer<Thread, Exception> completionHandler = mock(BiConsumer.class);
|
|
|
|
|
|
- ingestService.executeBulkRequest(Collections.singletonList(indexRequest), failureHandler, completionHandler, indexReq -> {});
|
|
|
+ ingestService.executeBulkRequest(1, Collections.singletonList(indexRequest), failureHandler, completionHandler, indexReq -> {});
|
|
|
|
|
|
assertTrue(failure.get());
|
|
|
- verify(completionHandler, times(1)).accept(null);
|
|
|
+ verify(completionHandler, times(1)).accept(Thread.currentThread(), null);
|
|
|
}
|
|
|
|
|
|
public void testUpdatePipelines() {
|
|
@@ -622,7 +624,7 @@ public class IngestServiceTests extends ESTestCase {
|
|
|
ingestService.applyClusterState(new ClusterChangedEvent("", clusterState, previousClusterState));
|
|
|
final SetOnce<Boolean> failure = new SetOnce<>();
|
|
|
final IndexRequest indexRequest = new IndexRequest("_index", "_type", "_id").source(emptyMap()).setPipeline(id);
|
|
|
- final BiConsumer<IndexRequest, Exception> failureHandler = (request, e) -> {
|
|
|
+ final BiConsumer<Integer, Exception> failureHandler = (slot, e) -> {
|
|
|
assertThat(e.getCause(), instanceOf(IllegalArgumentException.class));
|
|
|
assertThat(e.getCause().getCause(), instanceOf(IllegalStateException.class));
|
|
|
assertThat(e.getCause().getCause().getMessage(), equalTo("error"));
|
|
@@ -630,17 +632,17 @@ public class IngestServiceTests extends ESTestCase {
|
|
|
};
|
|
|
|
|
|
@SuppressWarnings("unchecked")
|
|
|
- final Consumer<Exception> completionHandler = mock(Consumer.class);
|
|
|
+ final BiConsumer<Thread, Exception> completionHandler = mock(BiConsumer.class);
|
|
|
|
|
|
- ingestService.executeBulkRequest(Collections.singletonList(indexRequest), failureHandler, completionHandler, indexReq -> {});
|
|
|
+ ingestService.executeBulkRequest(1, Collections.singletonList(indexRequest), failureHandler, completionHandler, indexReq -> {});
|
|
|
|
|
|
assertTrue(failure.get());
|
|
|
- verify(completionHandler, times(1)).accept(null);
|
|
|
+ verify(completionHandler, times(1)).accept(Thread.currentThread(), null);
|
|
|
}
|
|
|
|
|
|
public void testExecuteBulkPipelineDoesNotExist() {
|
|
|
IngestService ingestService = createWithProcessors(Collections.singletonMap(
|
|
|
- "mock", (factories, tag, config) -> mock(CompoundProcessor.class)));
|
|
|
+ "mock", (factories, tag, config) -> mockCompoundProcessor()));
|
|
|
|
|
|
PutPipelineRequest putRequest = new PutPipelineRequest("_id",
|
|
|
new BytesArray("{\"processors\": [{\"mock\" : {}}]}"), XContentType.JSON);
|
|
@@ -657,15 +659,16 @@ public class IngestServiceTests extends ESTestCase {
|
|
|
new IndexRequest("_index", "_type", "_id").source(Collections.emptyMap()).setPipeline("does_not_exist");
|
|
|
bulkRequest.add(indexRequest2);
|
|
|
@SuppressWarnings("unchecked")
|
|
|
- BiConsumer<IndexRequest, Exception> failureHandler = mock(BiConsumer.class);
|
|
|
+ BiConsumer<Integer, Exception> failureHandler = mock(BiConsumer.class);
|
|
|
@SuppressWarnings("unchecked")
|
|
|
- Consumer<Exception> completionHandler = mock(Consumer.class);
|
|
|
- ingestService.executeBulkRequest(bulkRequest.requests(), failureHandler, completionHandler, indexReq -> {});
|
|
|
+ final BiConsumer<Thread, Exception> completionHandler = mock(BiConsumer.class);
|
|
|
+ ingestService.executeBulkRequest(bulkRequest.numberOfActions(), bulkRequest.requests(), failureHandler,
|
|
|
+ completionHandler, indexReq -> {});
|
|
|
verify(failureHandler, times(1)).accept(
|
|
|
- argThat(new CustomTypeSafeMatcher<IndexRequest>("failure handler was not called with the expected arguments") {
|
|
|
+ argThat(new CustomTypeSafeMatcher<>("failure handler was not called with the expected arguments") {
|
|
|
@Override
|
|
|
- protected boolean matchesSafely(IndexRequest item) {
|
|
|
- return item == indexRequest2;
|
|
|
+ protected boolean matchesSafely(Integer item) {
|
|
|
+ return item == 1;
|
|
|
}
|
|
|
|
|
|
}),
|
|
@@ -676,12 +679,12 @@ public class IngestServiceTests extends ESTestCase {
|
|
|
}
|
|
|
})
|
|
|
);
|
|
|
- verify(completionHandler, times(1)).accept(null);
|
|
|
+ verify(completionHandler, times(1)).accept(Thread.currentThread(), null);
|
|
|
}
|
|
|
|
|
|
public void testExecuteSuccess() {
|
|
|
IngestService ingestService = createWithProcessors(Collections.singletonMap(
|
|
|
- "mock", (factories, tag, config) -> mock(CompoundProcessor.class)));
|
|
|
+ "mock", (factories, tag, config) -> mockCompoundProcessor()));
|
|
|
PutPipelineRequest putRequest = new PutPipelineRequest("_id",
|
|
|
new BytesArray("{\"processors\": [{\"mock\" : {}}]}"), XContentType.JSON);
|
|
|
ClusterState clusterState = ClusterState.builder(new ClusterName("_name")).build(); // Start empty
|
|
@@ -690,12 +693,12 @@ public class IngestServiceTests extends ESTestCase {
|
|
|
ingestService.applyClusterState(new ClusterChangedEvent("", clusterState, previousClusterState));
|
|
|
final IndexRequest indexRequest = new IndexRequest("_index", "_type", "_id").source(emptyMap()).setPipeline("_id");
|
|
|
@SuppressWarnings("unchecked")
|
|
|
- final BiConsumer<IndexRequest, Exception> failureHandler = mock(BiConsumer.class);
|
|
|
+ final BiConsumer<Integer, Exception> failureHandler = mock(BiConsumer.class);
|
|
|
@SuppressWarnings("unchecked")
|
|
|
- final Consumer<Exception> completionHandler = mock(Consumer.class);
|
|
|
- ingestService.executeBulkRequest(Collections.singletonList(indexRequest), failureHandler, completionHandler, indexReq -> {});
|
|
|
+ final BiConsumer<Thread, Exception> completionHandler = mock(BiConsumer.class);
|
|
|
+ ingestService.executeBulkRequest(1, Collections.singletonList(indexRequest), failureHandler, completionHandler, indexReq -> {});
|
|
|
verify(failureHandler, never()).accept(any(), any());
|
|
|
- verify(completionHandler, times(1)).accept(null);
|
|
|
+ verify(completionHandler, times(1)).accept(Thread.currentThread(), null);
|
|
|
}
|
|
|
|
|
|
public void testExecuteEmptyPipeline() throws Exception {
|
|
@@ -708,16 +711,16 @@ public class IngestServiceTests extends ESTestCase {
|
|
|
ingestService.applyClusterState(new ClusterChangedEvent("", clusterState, previousClusterState));
|
|
|
final IndexRequest indexRequest = new IndexRequest("_index", "_type", "_id").source(emptyMap()).setPipeline("_id");
|
|
|
@SuppressWarnings("unchecked")
|
|
|
- final BiConsumer<IndexRequest, Exception> failureHandler = mock(BiConsumer.class);
|
|
|
+ final BiConsumer<Integer, Exception> failureHandler = mock(BiConsumer.class);
|
|
|
@SuppressWarnings("unchecked")
|
|
|
- final Consumer<Exception> completionHandler = mock(Consumer.class);
|
|
|
- ingestService.executeBulkRequest(Collections.singletonList(indexRequest), failureHandler, completionHandler, indexReq -> {});
|
|
|
+ final BiConsumer<Thread, Exception> completionHandler = mock(BiConsumer.class);
|
|
|
+ ingestService.executeBulkRequest(1, Collections.singletonList(indexRequest), failureHandler, completionHandler, indexReq -> {});
|
|
|
verify(failureHandler, never()).accept(any(), any());
|
|
|
- verify(completionHandler, times(1)).accept(null);
|
|
|
+ verify(completionHandler, times(1)).accept(Thread.currentThread(), null);
|
|
|
}
|
|
|
|
|
|
public void testExecutePropagateAllMetaDataUpdates() throws Exception {
|
|
|
- final CompoundProcessor processor = mock(CompoundProcessor.class);
|
|
|
+ final CompoundProcessor processor = mockCompoundProcessor();
|
|
|
IngestService ingestService = createWithProcessors(Collections.singletonMap(
|
|
|
"mock", (factories, tag, config) -> processor));
|
|
|
PutPipelineRequest putRequest = new PutPipelineRequest("_id",
|
|
@@ -739,17 +742,21 @@ public class IngestServiceTests extends ESTestCase {
|
|
|
ingestDocument.setFieldValue(metaData.getFieldName(), "update" + metaData.getFieldName());
|
|
|
}
|
|
|
}
|
|
|
- return ingestDocument;
|
|
|
- }).when(processor).execute(any());
|
|
|
+
|
|
|
+ @SuppressWarnings("unchecked")
|
|
|
+ BiConsumer<IngestDocument, Exception> handler = (BiConsumer<IngestDocument, Exception>) invocationOnMock.getArguments()[1];
|
|
|
+ handler.accept(ingestDocument, null);
|
|
|
+ return null;
|
|
|
+ }).when(processor).execute(any(), any());
|
|
|
final IndexRequest indexRequest = new IndexRequest("_index", "_type", "_id").source(emptyMap()).setPipeline("_id");
|
|
|
@SuppressWarnings("unchecked")
|
|
|
- final BiConsumer<IndexRequest, Exception> failureHandler = mock(BiConsumer.class);
|
|
|
+ final BiConsumer<Integer, Exception> failureHandler = mock(BiConsumer.class);
|
|
|
@SuppressWarnings("unchecked")
|
|
|
- final Consumer<Exception> completionHandler = mock(Consumer.class);
|
|
|
- ingestService.executeBulkRequest(Collections.singletonList(indexRequest), failureHandler, completionHandler, indexReq -> {});
|
|
|
- verify(processor).execute(any());
|
|
|
+ final BiConsumer<Thread, Exception> completionHandler = mock(BiConsumer.class);
|
|
|
+ ingestService.executeBulkRequest(1, Collections.singletonList(indexRequest), failureHandler, completionHandler, indexReq -> {});
|
|
|
+ verify(processor).execute(any(), any());
|
|
|
verify(failureHandler, never()).accept(any(), any());
|
|
|
- verify(completionHandler, times(1)).accept(null);
|
|
|
+ verify(completionHandler, times(1)).accept(Thread.currentThread(), null);
|
|
|
assertThat(indexRequest.index(), equalTo("update_index"));
|
|
|
assertThat(indexRequest.type(), equalTo("update_type"));
|
|
|
assertThat(indexRequest.id(), equalTo("update_id"));
|
|
@@ -759,7 +766,7 @@ public class IngestServiceTests extends ESTestCase {
|
|
|
}
|
|
|
|
|
|
public void testExecuteFailure() throws Exception {
|
|
|
- final CompoundProcessor processor = mock(CompoundProcessor.class);
|
|
|
+ final CompoundProcessor processor = mockCompoundProcessor();
|
|
|
IngestService ingestService = createWithProcessors(Collections.singletonMap(
|
|
|
"mock", (factories, tag, config) -> processor));
|
|
|
PutPipelineRequest putRequest = new PutPipelineRequest("_id",
|
|
@@ -771,22 +778,37 @@ public class IngestServiceTests extends ESTestCase {
|
|
|
final IndexRequest indexRequest = new IndexRequest("_index", "_type", "_id").source(emptyMap()).setPipeline("_id");
|
|
|
doThrow(new RuntimeException())
|
|
|
.when(processor)
|
|
|
- .execute(eqIndexTypeId(indexRequest.version(), indexRequest.versionType(), emptyMap()));
|
|
|
+ .execute(eqIndexTypeId(indexRequest.version(), indexRequest.versionType(), emptyMap()), any());
|
|
|
@SuppressWarnings("unchecked")
|
|
|
- final BiConsumer<IndexRequest, Exception> failureHandler = mock(BiConsumer.class);
|
|
|
+ final BiConsumer<Integer, Exception> failureHandler = mock(BiConsumer.class);
|
|
|
@SuppressWarnings("unchecked")
|
|
|
- final Consumer<Exception> completionHandler = mock(Consumer.class);
|
|
|
- ingestService.executeBulkRequest(Collections.singletonList(indexRequest), failureHandler, completionHandler, indexReq -> {});
|
|
|
- verify(processor).execute(eqIndexTypeId(indexRequest.version(), indexRequest.versionType(), emptyMap()));
|
|
|
- verify(failureHandler, times(1)).accept(eq(indexRequest), any(RuntimeException.class));
|
|
|
- verify(completionHandler, times(1)).accept(null);
|
|
|
+ final BiConsumer<Thread, Exception> completionHandler = mock(BiConsumer.class);
|
|
|
+ ingestService.executeBulkRequest(1, Collections.singletonList(indexRequest), failureHandler, completionHandler, indexReq -> {});
|
|
|
+ verify(processor).execute(eqIndexTypeId(indexRequest.version(), indexRequest.versionType(), emptyMap()), any());
|
|
|
+ verify(failureHandler, times(1)).accept(eq(0), any(RuntimeException.class));
|
|
|
+ verify(completionHandler, times(1)).accept(Thread.currentThread(), null);
|
|
|
}
|
|
|
|
|
|
public void testExecuteSuccessWithOnFailure() throws Exception {
|
|
|
final Processor processor = mock(Processor.class);
|
|
|
when(processor.getType()).thenReturn("mock_processor_type");
|
|
|
when(processor.getTag()).thenReturn("mock_processor_tag");
|
|
|
+ doAnswer(args -> {
|
|
|
+ @SuppressWarnings("unchecked")
|
|
|
+ BiConsumer<IngestDocument, Exception> handler = (BiConsumer) args.getArguments()[1];
|
|
|
+ handler.accept(null, new RuntimeException());
|
|
|
+ return null;
|
|
|
+ }).when(processor).execute(eqIndexTypeId(emptyMap()), any());
|
|
|
+
|
|
|
final Processor onFailureProcessor = mock(Processor.class);
|
|
|
+ doAnswer(args -> {
|
|
|
+ IngestDocument ingestDocument = (IngestDocument) args.getArguments()[0];
|
|
|
+ @SuppressWarnings("unchecked")
|
|
|
+ BiConsumer<IngestDocument, Exception> handler = (BiConsumer) args.getArguments()[1];
|
|
|
+ handler.accept(ingestDocument, null);
|
|
|
+ return null;
|
|
|
+ }).when(onFailureProcessor).execute(eqIndexTypeId(emptyMap()), any());
|
|
|
+
|
|
|
final CompoundProcessor compoundProcessor = new CompoundProcessor(
|
|
|
false, Collections.singletonList(processor), Collections.singletonList(new CompoundProcessor(onFailureProcessor)));
|
|
|
IngestService ingestService = createWithProcessors(Collections.singletonMap(
|
|
@@ -798,14 +820,13 @@ public class IngestServiceTests extends ESTestCase {
|
|
|
clusterState = IngestService.innerPut(putRequest, clusterState);
|
|
|
ingestService.applyClusterState(new ClusterChangedEvent("", clusterState, previousClusterState));
|
|
|
final IndexRequest indexRequest = new IndexRequest("_index", "_type", "_id").source(emptyMap()).setPipeline("_id");
|
|
|
- doThrow(new RuntimeException()).when(processor).execute(eqIndexTypeId(emptyMap()));
|
|
|
@SuppressWarnings("unchecked")
|
|
|
- final BiConsumer<IndexRequest, Exception> failureHandler = mock(BiConsumer.class);
|
|
|
+ final BiConsumer<Integer, Exception> failureHandler = mock(BiConsumer.class);
|
|
|
@SuppressWarnings("unchecked")
|
|
|
- final Consumer<Exception> completionHandler = mock(Consumer.class);
|
|
|
- ingestService.executeBulkRequest(Collections.singletonList(indexRequest), failureHandler, completionHandler, indexReq -> {});
|
|
|
- verify(failureHandler, never()).accept(eq(indexRequest), any(ElasticsearchException.class));
|
|
|
- verify(completionHandler, times(1)).accept(null);
|
|
|
+ final BiConsumer<Thread, Exception> completionHandler = mock(BiConsumer.class);
|
|
|
+ ingestService.executeBulkRequest(1, Collections.singletonList(indexRequest), failureHandler, completionHandler, indexReq -> {});
|
|
|
+ verify(failureHandler, never()).accept(eq(0), any(ElasticsearchException.class));
|
|
|
+ verify(completionHandler, times(1)).accept(Thread.currentThread(), null);
|
|
|
}
|
|
|
|
|
|
public void testExecuteFailureWithNestedOnFailure() throws Exception {
|
|
@@ -829,21 +850,21 @@ public class IngestServiceTests extends ESTestCase {
|
|
|
final IndexRequest indexRequest = new IndexRequest("_index", "_type", "_id").source(emptyMap()).setPipeline("_id");
|
|
|
doThrow(new RuntimeException())
|
|
|
.when(onFailureOnFailureProcessor)
|
|
|
- .execute(eqIndexTypeId(indexRequest.version(), indexRequest.versionType(), emptyMap()));
|
|
|
+ .execute(eqIndexTypeId(indexRequest.version(), indexRequest.versionType(), emptyMap()), any());
|
|
|
doThrow(new RuntimeException())
|
|
|
.when(onFailureProcessor)
|
|
|
- .execute(eqIndexTypeId(indexRequest.version(), indexRequest.versionType(), emptyMap()));
|
|
|
+ .execute(eqIndexTypeId(indexRequest.version(), indexRequest.versionType(), emptyMap()), any());
|
|
|
doThrow(new RuntimeException())
|
|
|
.when(processor)
|
|
|
- .execute(eqIndexTypeId(indexRequest.version(), indexRequest.versionType(), emptyMap()));
|
|
|
+ .execute(eqIndexTypeId(indexRequest.version(), indexRequest.versionType(), emptyMap()), any());
|
|
|
@SuppressWarnings("unchecked")
|
|
|
- final BiConsumer<IndexRequest, Exception> failureHandler = mock(BiConsumer.class);
|
|
|
+ final BiConsumer<Integer, Exception> failureHandler = mock(BiConsumer.class);
|
|
|
@SuppressWarnings("unchecked")
|
|
|
- final Consumer<Exception> completionHandler = mock(Consumer.class);
|
|
|
- ingestService.executeBulkRequest(Collections.singletonList(indexRequest), failureHandler, completionHandler, indexReq -> {});
|
|
|
- verify(processor).execute(eqIndexTypeId(indexRequest.version(), indexRequest.versionType(), emptyMap()));
|
|
|
- verify(failureHandler, times(1)).accept(eq(indexRequest), any(RuntimeException.class));
|
|
|
- verify(completionHandler, times(1)).accept(null);
|
|
|
+ final BiConsumer<Thread, Exception> completionHandler = mock(BiConsumer.class);
|
|
|
+ ingestService.executeBulkRequest(1, Collections.singletonList(indexRequest), failureHandler, completionHandler, indexReq -> {});
|
|
|
+ verify(processor).execute(eqIndexTypeId(indexRequest.version(), indexRequest.versionType(), emptyMap()), any());
|
|
|
+ verify(failureHandler, times(1)).accept(eq(0), any(RuntimeException.class));
|
|
|
+ verify(completionHandler, times(1)).accept(Thread.currentThread(), null);
|
|
|
}
|
|
|
|
|
|
public void testBulkRequestExecutionWithFailures() throws Exception {
|
|
@@ -872,7 +893,12 @@ public class IngestServiceTests extends ESTestCase {
|
|
|
CompoundProcessor processor = mock(CompoundProcessor.class);
|
|
|
when(processor.getProcessors()).thenReturn(Collections.singletonList(mock(Processor.class)));
|
|
|
Exception error = new RuntimeException();
|
|
|
- doThrow(error).when(processor).execute(any());
|
|
|
+ doAnswer(args -> {
|
|
|
+ @SuppressWarnings("unchecked")
|
|
|
+ BiConsumer<IngestDocument, Exception> handler = (BiConsumer) args.getArguments()[1];
|
|
|
+ handler.accept(null, error);
|
|
|
+ return null;
|
|
|
+ }).when(processor).execute(any(), any());
|
|
|
IngestService ingestService = createWithProcessors(Collections.singletonMap(
|
|
|
"mock", (factories, tag, config) -> processor));
|
|
|
PutPipelineRequest putRequest = new PutPipelineRequest("_id",
|
|
@@ -883,18 +909,18 @@ public class IngestServiceTests extends ESTestCase {
|
|
|
ingestService.applyClusterState(new ClusterChangedEvent("", clusterState, previousClusterState));
|
|
|
|
|
|
@SuppressWarnings("unchecked")
|
|
|
- BiConsumer<IndexRequest, Exception> requestItemErrorHandler = mock(BiConsumer.class);
|
|
|
+ BiConsumer<Integer, Exception> requestItemErrorHandler = mock(BiConsumer.class);
|
|
|
@SuppressWarnings("unchecked")
|
|
|
- Consumer<Exception> completionHandler = mock(Consumer.class);
|
|
|
- ingestService.executeBulkRequest(bulkRequest.requests(), requestItemErrorHandler, completionHandler, indexReq -> {});
|
|
|
+ final BiConsumer<Thread, Exception> completionHandler = mock(BiConsumer.class);
|
|
|
+ ingestService.executeBulkRequest(numRequest, bulkRequest.requests(), requestItemErrorHandler, completionHandler, indexReq -> {});
|
|
|
|
|
|
- verify(requestItemErrorHandler, times(numIndexRequests)).accept(any(IndexRequest.class), argThat(new ArgumentMatcher<Exception>() {
|
|
|
+ verify(requestItemErrorHandler, times(numIndexRequests)).accept(anyInt(), argThat(new ArgumentMatcher<Exception>() {
|
|
|
@Override
|
|
|
public boolean matches(final Object o) {
|
|
|
return ((Exception)o).getCause().getCause().equals(error);
|
|
|
}
|
|
|
}));
|
|
|
- verify(completionHandler, times(1)).accept(null);
|
|
|
+ verify(completionHandler, times(1)).accept(Thread.currentThread(), null);
|
|
|
}
|
|
|
|
|
|
public void testBulkRequestExecution() throws Exception {
|
|
@@ -917,7 +943,12 @@ public class IngestServiceTests extends ESTestCase {
|
|
|
final Processor processor = mock(Processor.class);
|
|
|
when(processor.getType()).thenReturn("mock");
|
|
|
when(processor.getTag()).thenReturn("mockTag");
|
|
|
- when(processor.execute(any(IngestDocument.class))).thenReturn( RandomDocumentPicks.randomIngestDocument(random()));
|
|
|
+ doAnswer(args -> {
|
|
|
+ @SuppressWarnings("unchecked")
|
|
|
+ BiConsumer<IngestDocument, Exception> handler = (BiConsumer) args.getArguments()[1];
|
|
|
+ handler.accept(RandomDocumentPicks.randomIngestDocument(random()), null);
|
|
|
+ return null;
|
|
|
+ }).when(processor).execute(any(), any());
|
|
|
Map<String, Processor.Factory> map = new HashMap<>(2);
|
|
|
map.put("mock", (factories, tag, config) -> processor);
|
|
|
|
|
@@ -930,13 +961,13 @@ public class IngestServiceTests extends ESTestCase {
|
|
|
ingestService.applyClusterState(new ClusterChangedEvent("", clusterState, previousClusterState));
|
|
|
|
|
|
@SuppressWarnings("unchecked")
|
|
|
- BiConsumer<IndexRequest, Exception> requestItemErrorHandler = mock(BiConsumer.class);
|
|
|
+ BiConsumer<Integer, Exception> requestItemErrorHandler = mock(BiConsumer.class);
|
|
|
@SuppressWarnings("unchecked")
|
|
|
- Consumer<Exception> completionHandler = mock(Consumer.class);
|
|
|
- ingestService.executeBulkRequest(bulkRequest.requests(), requestItemErrorHandler, completionHandler, indexReq -> {});
|
|
|
+ final BiConsumer<Thread, Exception> completionHandler = mock(BiConsumer.class);
|
|
|
+ ingestService.executeBulkRequest(numRequest, bulkRequest.requests(), requestItemErrorHandler, completionHandler, indexReq -> {});
|
|
|
|
|
|
verify(requestItemErrorHandler, never()).accept(any(), any());
|
|
|
- verify(completionHandler, times(1)).accept(null);
|
|
|
+ verify(completionHandler, times(1)).accept(Thread.currentThread(), null);
|
|
|
for (DocWriteRequest<?> docWriteRequest : bulkRequest.requests()) {
|
|
|
IndexRequest indexRequest = TransportBulkAction.getIndexWriteRequest(docWriteRequest);
|
|
|
assertThat(indexRequest, notNullValue());
|
|
@@ -951,8 +982,18 @@ public class IngestServiceTests extends ESTestCase {
|
|
|
when(processor.getTag()).thenReturn("mockTag");
|
|
|
when(processorFailure.getType()).thenReturn("failure-mock");
|
|
|
//avoid returning null and dropping the document
|
|
|
- when(processor.execute(any(IngestDocument.class))).thenReturn( RandomDocumentPicks.randomIngestDocument(random()));
|
|
|
- when(processorFailure.execute(any(IngestDocument.class))).thenThrow(new RuntimeException("error"));
|
|
|
+ doAnswer(args -> {
|
|
|
+ @SuppressWarnings("unchecked")
|
|
|
+ BiConsumer<IngestDocument, Exception> handler = (BiConsumer) args.getArguments()[1];
|
|
|
+ handler.accept(RandomDocumentPicks.randomIngestDocument(random()), null);
|
|
|
+ return null;
|
|
|
+ }).when(processor).execute(any(IngestDocument.class), any());
|
|
|
+ doAnswer(args -> {
|
|
|
+ @SuppressWarnings("unchecked")
|
|
|
+ BiConsumer<IngestDocument, Exception> handler = (BiConsumer) args.getArguments()[1];
|
|
|
+ handler.accept(null, new RuntimeException("error"));
|
|
|
+ return null;
|
|
|
+ }).when(processorFailure).execute(any(IngestDocument.class), any());
|
|
|
Map<String, Processor.Factory> map = new HashMap<>(2);
|
|
|
map.put("mock", (factories, tag, config) -> processor);
|
|
|
map.put("failure-mock", (factories, tag, config) -> processorFailure);
|
|
@@ -974,13 +1015,13 @@ public class IngestServiceTests extends ESTestCase {
|
|
|
clusterState = IngestService.innerPut(putRequest, clusterState);
|
|
|
ingestService.applyClusterState(new ClusterChangedEvent("", clusterState, previousClusterState));
|
|
|
|
|
|
- @SuppressWarnings("unchecked") final BiConsumer<IndexRequest, Exception> failureHandler = mock(BiConsumer.class);
|
|
|
- @SuppressWarnings("unchecked") final Consumer<Exception> completionHandler = mock(Consumer.class);
|
|
|
+ @SuppressWarnings("unchecked") final BiConsumer<Integer, Exception> failureHandler = mock(BiConsumer.class);
|
|
|
+ @SuppressWarnings("unchecked") final BiConsumer<Thread, Exception> completionHandler = mock(BiConsumer.class);
|
|
|
|
|
|
final IndexRequest indexRequest = new IndexRequest("_index");
|
|
|
indexRequest.setPipeline("_id1");
|
|
|
indexRequest.source(randomAlphaOfLength(10), randomAlphaOfLength(10));
|
|
|
- ingestService.executeBulkRequest(Collections.singletonList(indexRequest), failureHandler, completionHandler, indexReq -> {});
|
|
|
+ ingestService.executeBulkRequest(1, Collections.singletonList(indexRequest), failureHandler, completionHandler, indexReq -> {});
|
|
|
final IngestStats afterFirstRequestStats = ingestService.stats();
|
|
|
assertThat(afterFirstRequestStats.getPipelineStats().size(), equalTo(2));
|
|
|
|
|
@@ -998,7 +1039,7 @@ public class IngestServiceTests extends ESTestCase {
|
|
|
|
|
|
|
|
|
indexRequest.setPipeline("_id2");
|
|
|
- ingestService.executeBulkRequest(Collections.singletonList(indexRequest), failureHandler, completionHandler, indexReq -> {});
|
|
|
+ ingestService.executeBulkRequest(1, Collections.singletonList(indexRequest), failureHandler, completionHandler, indexReq -> {});
|
|
|
final IngestStats afterSecondRequestStats = ingestService.stats();
|
|
|
assertThat(afterSecondRequestStats.getPipelineStats().size(), equalTo(2));
|
|
|
//total
|
|
@@ -1017,7 +1058,7 @@ public class IngestServiceTests extends ESTestCase {
|
|
|
clusterState = IngestService.innerPut(putRequest, clusterState);
|
|
|
ingestService.applyClusterState(new ClusterChangedEvent("", clusterState, previousClusterState));
|
|
|
indexRequest.setPipeline("_id1");
|
|
|
- ingestService.executeBulkRequest(Collections.singletonList(indexRequest), failureHandler, completionHandler, indexReq -> {});
|
|
|
+ ingestService.executeBulkRequest(1, Collections.singletonList(indexRequest), failureHandler, completionHandler, indexReq -> {});
|
|
|
final IngestStats afterThirdRequestStats = ingestService.stats();
|
|
|
assertThat(afterThirdRequestStats.getPipelineStats().size(), equalTo(2));
|
|
|
//total
|
|
@@ -1041,7 +1082,7 @@ public class IngestServiceTests extends ESTestCase {
|
|
|
clusterState = IngestService.innerPut(putRequest, clusterState);
|
|
|
ingestService.applyClusterState(new ClusterChangedEvent("", clusterState, previousClusterState));
|
|
|
indexRequest.setPipeline("_id1");
|
|
|
- ingestService.executeBulkRequest(Collections.singletonList(indexRequest), failureHandler, completionHandler, indexReq -> {});
|
|
|
+ ingestService.executeBulkRequest(1, Collections.singletonList(indexRequest), failureHandler, completionHandler, indexReq -> {});
|
|
|
final IngestStats afterForthRequestStats = ingestService.stats();
|
|
|
assertThat(afterForthRequestStats.getPipelineStats().size(), equalTo(2));
|
|
|
//total
|
|
@@ -1107,15 +1148,15 @@ public class IngestServiceTests extends ESTestCase {
|
|
|
ingestService.applyClusterState(new ClusterChangedEvent("", clusterState, previousClusterState));
|
|
|
final IndexRequest indexRequest = new IndexRequest("_index", "_type", "_id").source(emptyMap()).setPipeline("_id");
|
|
|
@SuppressWarnings("unchecked")
|
|
|
- final BiConsumer<IndexRequest, Exception> failureHandler = mock(BiConsumer.class);
|
|
|
+ final BiConsumer<Integer, Exception> failureHandler = mock(BiConsumer.class);
|
|
|
@SuppressWarnings("unchecked")
|
|
|
- final Consumer<Exception> completionHandler = mock(Consumer.class);
|
|
|
+ final BiConsumer<Thread, Exception> completionHandler = mock(BiConsumer.class);
|
|
|
@SuppressWarnings("unchecked")
|
|
|
- final Consumer<IndexRequest> dropHandler = mock(Consumer.class);
|
|
|
- ingestService.executeBulkRequest(Collections.singletonList(indexRequest), failureHandler, completionHandler, dropHandler);
|
|
|
+ final IntConsumer dropHandler = mock(IntConsumer.class);
|
|
|
+ ingestService.executeBulkRequest(1, Collections.singletonList(indexRequest), failureHandler, completionHandler, dropHandler);
|
|
|
verify(failureHandler, never()).accept(any(), any());
|
|
|
- verify(completionHandler, times(1)).accept(null);
|
|
|
- verify(dropHandler, times(1)).accept(indexRequest);
|
|
|
+ verify(completionHandler, times(1)).accept(Thread.currentThread(), null);
|
|
|
+ verify(dropHandler, times(1)).accept(0);
|
|
|
}
|
|
|
|
|
|
public void testIngestClusterStateListeners_orderOfExecution() {
|
|
@@ -1157,7 +1198,7 @@ public class IngestServiceTests extends ESTestCase {
|
|
|
}
|
|
|
|
|
|
private IngestDocument eqIndexTypeId(final Map<String, Object> source) {
|
|
|
- return argThat(new IngestDocumentMatcher("_index", "_type", "_id", source));
|
|
|
+ return argThat(new IngestDocumentMatcher("_index", "_type", "_id", -3L, VersionType.INTERNAL, source));
|
|
|
}
|
|
|
|
|
|
private IngestDocument eqIndexTypeId(final Long version, final VersionType versionType, final Map<String, Object> source) {
|
|
@@ -1193,6 +1234,17 @@ public class IngestServiceTests extends ESTestCase {
|
|
|
}), client);
|
|
|
}
|
|
|
|
|
|
+ private CompoundProcessor mockCompoundProcessor() {
|
|
|
+ CompoundProcessor processor = mock(CompoundProcessor.class);
|
|
|
+ doAnswer(args -> {
|
|
|
+ @SuppressWarnings("unchecked")
|
|
|
+ BiConsumer<IngestDocument, Exception> handler = (BiConsumer) args.getArguments()[1];
|
|
|
+ handler.accept((IngestDocument) args.getArguments()[0], null);
|
|
|
+ return null;
|
|
|
+ }).when(processor).execute(any(), any());
|
|
|
+ return processor;
|
|
|
+ }
|
|
|
+
|
|
|
private class IngestDocumentMatcher extends ArgumentMatcher<IngestDocument> {
|
|
|
|
|
|
private final IngestDocument ingestDocument;
|