Browse Source

required changes after merge

Martijn van Groningen 6 years ago
parent
commit
eea64ff374

+ 27 - 6
server/src/test/java/org/elasticsearch/action/ingest/SimulateExecutionServiceTests.java

@@ -264,12 +264,19 @@ public class SimulateExecutionServiceTests extends ESTestCase {
         assertThat(exception.getMessage(), equalTo("java.lang.IllegalArgumentException: java.lang.RuntimeException: processor failed"));
     }
 
-    public void testDropDocument() {
+    public void testDropDocument() throws Exception {
         TestProcessor processor1 = new TestProcessor(ingestDocument -> ingestDocument.setFieldValue("field", "value"));
         Processor processor2 = new DropProcessor.Factory().create(Map.of(), null, Map.of());
         Pipeline pipeline = new Pipeline("_id", "_description", version, new CompoundProcessor(processor1, processor2));
 
-        SimulateDocumentResult actualItemResponse = executionService.executeDocument(pipeline, ingestDocument, false);
+        CountDownLatch latch = new CountDownLatch(1);
+        AtomicReference<SimulateDocumentResult> holder = new AtomicReference<>();
+        executionService.executeDocument(pipeline, ingestDocument, false, (r, e) -> {
+            holder.set(r);
+            latch.countDown();
+        });
+        latch.await();
+        SimulateDocumentResult actualItemResponse = holder.get();
         assertThat(processor1.getInvokedCounter(), equalTo(1));
         assertThat(actualItemResponse, instanceOf(SimulateDocumentBaseResult.class));
         SimulateDocumentBaseResult simulateDocumentBaseResult = (SimulateDocumentBaseResult) actualItemResponse;
@@ -277,12 +284,19 @@ public class SimulateExecutionServiceTests extends ESTestCase {
         assertThat(simulateDocumentBaseResult.getFailure(), nullValue());
     }
 
-    public void testDropDocumentVerbose() {
+    public void testDropDocumentVerbose() throws Exception {
         TestProcessor processor1 = new TestProcessor(ingestDocument -> ingestDocument.setFieldValue("field", "value"));
         Processor processor2 = new DropProcessor.Factory().create(Map.of(), null, Map.of());
         Pipeline pipeline = new Pipeline("_id", "_description", version, new CompoundProcessor(processor1, processor2));
 
-        SimulateDocumentResult actualItemResponse = executionService.executeDocument(pipeline, ingestDocument, true);
+        CountDownLatch latch = new CountDownLatch(1);
+        AtomicReference<SimulateDocumentResult> holder = new AtomicReference<>();
+        executionService.executeDocument(pipeline, ingestDocument, true, (r, e) -> {
+            holder.set(r);
+            latch.countDown();
+        });
+        latch.await();
+        SimulateDocumentResult actualItemResponse = holder.get();
         assertThat(processor1.getInvokedCounter(), equalTo(1));
         assertThat(actualItemResponse, instanceOf(SimulateDocumentVerboseResult.class));
         SimulateDocumentVerboseResult verboseResult = (SimulateDocumentVerboseResult) actualItemResponse;
@@ -293,13 +307,20 @@ public class SimulateExecutionServiceTests extends ESTestCase {
         assertThat(verboseResult.getProcessorResults().get(1).getFailure(), nullValue());
     }
 
-    public void testDropDocumentVerboseExtraProcessor() {
+    public void testDropDocumentVerboseExtraProcessor() throws Exception {
         TestProcessor processor1 = new TestProcessor(ingestDocument -> ingestDocument.setFieldValue("field1", "value"));
         Processor processor2 = new DropProcessor.Factory().create(Map.of(), null, Map.of());
         TestProcessor processor3 = new TestProcessor(ingestDocument -> ingestDocument.setFieldValue("field2", "value"));
         Pipeline pipeline = new Pipeline("_id", "_description", version, new CompoundProcessor(processor1, processor2, processor3));
 
-        SimulateDocumentResult actualItemResponse = executionService.executeDocument(pipeline, ingestDocument, true);
+        CountDownLatch latch = new CountDownLatch(1);
+        AtomicReference<SimulateDocumentResult> holder = new AtomicReference<>();
+        executionService.executeDocument(pipeline, ingestDocument, true, (r, e) -> {
+            holder.set(r);
+            latch.countDown();
+        });
+        latch.await();
+        SimulateDocumentResult actualItemResponse = holder.get();
         assertThat(processor1.getInvokedCounter(), equalTo(1));
         assertThat(processor3.getInvokedCounter(), equalTo(0));
         assertThat(actualItemResponse, instanceOf(SimulateDocumentVerboseResult.class));