瀏覽代碼

Correctly handle an exception case for ingest failure (#92455)

Joe Gallo 2 年之前
父節點
當前提交
9a3d39a1d4

+ 5 - 0
docs/changelog/92455.yaml

@@ -0,0 +1,5 @@
+pr: 92455
+summary: Correctly handle an exception case for ingest failure
+area: Ingest Node
+type: bug
+issues: []

+ 0 - 0
modules/ingest-common/src/yamlRestTest/resources/rest-api-spec/test/ingest/240_required_pipeline.yml → modules/ingest-common/src/yamlRestTest/resources/rest-api-spec/test/ingest/240_final_pipeline.yml


+ 19 - 43
server/src/main/java/org/elasticsearch/ingest/IngestService.java

@@ -22,6 +22,7 @@ import org.elasticsearch.action.bulk.TransportBulkAction;
 import org.elasticsearch.action.index.IndexRequest;
 import org.elasticsearch.action.ingest.DeletePipelineRequest;
 import org.elasticsearch.action.ingest.PutPipelineRequest;
+import org.elasticsearch.action.support.CountDownActionListener;
 import org.elasticsearch.action.support.master.AcknowledgedResponse;
 import org.elasticsearch.client.internal.Client;
 import org.elasticsearch.cluster.ClusterChangedEvent;
@@ -72,7 +73,6 @@ import java.util.Map;
 import java.util.Objects;
 import java.util.Set;
 import java.util.concurrent.CopyOnWriteArrayList;
-import java.util.concurrent.atomic.AtomicInteger;
 import java.util.function.BiConsumer;
 import java.util.function.Consumer;
 import java.util.function.IntConsumer;
@@ -687,15 +687,16 @@ public class IngestService implements ClusterStateApplier, ReportingService<Inge
             @Override
             protected void doRun() {
                 final Thread originalThread = Thread.currentThread();
-                final AtomicInteger counter = new AtomicInteger(numberOfActionRequests);
+                final ActionListener<Void> onFinished = new CountDownActionListener(
+                    numberOfActionRequests,
+                    () -> onCompletion.accept(originalThread, null)
+                );
+
                 int i = 0;
                 for (DocWriteRequest<?> actionRequest : actionRequests) {
                     IndexRequest indexRequest = TransportBulkAction.getIndexWriteRequest(actionRequest);
                     if (indexRequest == null) {
-                        if (counter.decrementAndGet() == 0) {
-                            onCompletion.accept(originalThread, null);
-                        }
-                        assert counter.get() >= 0;
+                        onFinished.onResponse(null);
                         i++;
                         continue;
                     }
@@ -715,25 +716,12 @@ public class IngestService implements ClusterStateApplier, ReportingService<Inge
                     } else if (IngestService.NOOP_PIPELINE_NAME.equals(finalPipelineId) == false) {
                         pipelines = List.of(finalPipelineId);
                     } else {
-                        if (counter.decrementAndGet() == 0) {
-                            onCompletion.accept(originalThread, null);
-                        }
-                        assert counter.get() >= 0;
+                        onFinished.onResponse(null);
                         i++;
                         continue;
                     }
 
-                    executePipelines(
-                        i,
-                        pipelines.iterator(),
-                        hasFinalPipeline,
-                        indexRequest,
-                        onDropped,
-                        onFailure,
-                        counter,
-                        onCompletion,
-                        originalThread
-                    );
+                    executePipelines(i, pipelines.iterator(), hasFinalPipeline, indexRequest, onDropped, onFailure, onFinished);
 
                     i++;
                 }
@@ -748,9 +736,7 @@ public class IngestService implements ClusterStateApplier, ReportingService<Inge
         final IndexRequest indexRequest,
         final IntConsumer onDropped,
         final BiConsumer<Integer, Exception> onFailure,
-        final AtomicInteger counter,
-        final BiConsumer<Thread, Exception> onCompletion,
-        final Thread originalThread
+        final ActionListener<Void> onFinished
     ) {
         assert it.hasNext();
         final String pipelineId = it.next();
@@ -778,6 +764,9 @@ public class IngestService implements ClusterStateApplier, ReportingService<Inge
                         e
                     );
                     onFailure.accept(slot, e);
+                    // document failed! no further processing of this doc
+                    onFinished.onResponse(null);
+                    return;
                 }
 
                 Iterator<String> newIt = it;
@@ -791,6 +780,9 @@ public class IngestService implements ClusterStateApplier, ReportingService<Inge
                             slot,
                             new IllegalStateException("final pipeline [" + pipelineId + "] can't change the target index")
                         );
+                        // document failed! no further processing of this doc
+                        onFinished.onResponse(null);
+                        return;
                     } else {
                         indexRequest.isPipelineResolved(false);
                         resolvePipelines(null, indexRequest, state.metadata());
@@ -804,22 +796,9 @@ public class IngestService implements ClusterStateApplier, ReportingService<Inge
                 }
 
                 if (newIt.hasNext()) {
-                    executePipelines(
-                        slot,
-                        newIt,
-                        newHasFinalPipeline,
-                        indexRequest,
-                        onDropped,
-                        onFailure,
-                        counter,
-                        onCompletion,
-                        originalThread
-                    );
+                    executePipelines(slot, newIt, newHasFinalPipeline, indexRequest, onDropped, onFailure, onFinished);
                 } else {
-                    if (counter.decrementAndGet() == 0) {
-                        onCompletion.accept(originalThread, null);
-                    }
-                    assert counter.get() >= 0;
+                    onFinished.onResponse(null);
                 }
             });
         } catch (Exception e) {
@@ -828,10 +807,7 @@ public class IngestService implements ClusterStateApplier, ReportingService<Inge
                 e
             );
             onFailure.accept(slot, e);
-            if (counter.decrementAndGet() == 0) {
-                onCompletion.accept(originalThread, null);
-            }
-            assert counter.get() >= 0;
+            onFinished.onResponse(null);
         }
     }
 

+ 20 - 7
server/src/test/java/org/elasticsearch/ingest/IngestServiceTests.java

@@ -1136,20 +1136,34 @@ public class IngestServiceTests extends ESTestCase {
 
     public void testExecuteFailure() throws Exception {
         final CompoundProcessor processor = mockCompoundProcessor();
-        IngestService ingestService = createWithProcessors(Map.of("mock", (factories, tag, description, config) -> processor));
-        PutPipelineRequest putRequest = new PutPipelineRequest(
-            "_id",
+        IngestService ingestService = createWithProcessors(
+            Map.of(
+                "mock",
+                (factories, tag, description, config) -> processor,
+                "set",
+                (factories, tag, description, config) -> new FakeProcessor("set", "", "", (ingestDocument) -> fail())
+            )
+        );
+        PutPipelineRequest putRequest1 = new PutPipelineRequest(
+            "_id1",
             new BytesArray("{\"processors\": [{\"mock\" : {}}]}"),
             XContentType.JSON
         );
+        // given that set -> fail() above, it's a failure if a document executes against this pipeline
+        PutPipelineRequest putRequest2 = new PutPipelineRequest(
+            "_id2",
+            new BytesArray("{\"processors\": [{\"set\" : {}}]}"),
+            XContentType.JSON
+        );
         ClusterState clusterState = ClusterState.builder(new ClusterName("_name")).build(); // Start empty
         ClusterState previousClusterState = clusterState;
-        clusterState = executePut(putRequest, clusterState);
+        clusterState = executePut(putRequest1, clusterState);
+        clusterState = executePut(putRequest2, clusterState);
         ingestService.applyClusterState(new ClusterChangedEvent("", clusterState, previousClusterState));
         final IndexRequest indexRequest = new IndexRequest("_index").id("_id")
             .source(Map.of())
-            .setPipeline("_id")
-            .setFinalPipeline("_none");
+            .setPipeline("_id1")
+            .setFinalPipeline("_id2");
         doThrow(new RuntimeException()).when(processor)
             .execute(eqIndexTypeId(indexRequest.version(), indexRequest.versionType(), Map.of()), any());
         @SuppressWarnings("unchecked")
@@ -2057,7 +2071,6 @@ public class IngestServiceTests extends ESTestCase {
     }
 
     private static IngestService createWithProcessors(Map<String, Processor.Factory> processors) {
-
         Client client = mock(Client.class);
         ThreadPool threadPool = mock(ThreadPool.class);
         when(threadPool.generic()).thenReturn(EsExecutors.DIRECT_EXECUTOR_SERVICE);