浏览代码

[Transform] improve irrecoverable error detection - part 2 (#52003)

base error handling on rest status instead of listing individual exception types

relates to #51820
Hendrik Muhs 5 年之前
父节点
当前提交
6ace2ef9cc

+ 51 - 32
x-pack/plugin/transform/src/main/java/org/elasticsearch/xpack/transform/transforms/TransformIndexer.java

@@ -19,7 +19,6 @@ import org.elasticsearch.action.support.IndicesOptions;
 import org.elasticsearch.common.Strings;
 import org.elasticsearch.common.breaker.CircuitBreakingException;
 import org.elasticsearch.common.xcontent.XContentBuilder;
-import org.elasticsearch.index.IndexNotFoundException;
 import org.elasticsearch.index.query.BoolQueryBuilder;
 import org.elasticsearch.index.query.QueryBuilder;
 import org.elasticsearch.script.ScriptException;
@@ -42,7 +41,6 @@ import org.elasticsearch.xpack.core.transform.utils.ExceptionsHelper;
 import org.elasticsearch.xpack.transform.checkpoint.CheckpointProvider;
 import org.elasticsearch.xpack.transform.notifications.TransformAuditor;
 import org.elasticsearch.xpack.transform.persistence.TransformConfigManager;
-import org.elasticsearch.xpack.transform.transforms.pivot.AggregationResultUtils;
 import org.elasticsearch.xpack.transform.transforms.pivot.Pivot;
 import org.elasticsearch.xpack.transform.utils.ExceptionRootCauseFinder;
 
@@ -287,7 +285,7 @@ public abstract class TransformIndexer extends AsyncTwoPhaseIndexer<TransformInd
                     // If the transform config index or the transform config is gone, something serious occurred
                     // We are in an unknown state and should fail out
                     if (failure instanceof ResourceNotFoundException) {
-                        updateConfigListener.onFailure(new TransformConfigReloadingException(msg, failure));
+                        updateConfigListener.onFailure(new TransformConfigLostOnReloadException(msg, failure));
                     } else {
                         auditor.warning(getJobId(), msg);
                         updateConfigListener.onResponse(null);
@@ -477,37 +475,54 @@ public abstract class TransformIndexer extends AsyncTwoPhaseIndexer<TransformInd
 
         if (unwrappedException instanceof CircuitBreakingException) {
             handleCircuitBreakingException((CircuitBreakingException) unwrappedException);
-        } else if (unwrappedException instanceof ScriptException) {
+            return;
+        }
+
+        if (unwrappedException instanceof ScriptException) {
             handleScriptException((ScriptException) unwrappedException);
-            // irrecoverable error without special handling
-        } else if (unwrappedException instanceof BulkIndexingException && ((BulkIndexingException) unwrappedException).isIrrecoverable()) {
+            return;
+        }
+
+        if (unwrappedException instanceof BulkIndexingException && ((BulkIndexingException) unwrappedException).isIrrecoverable()) {
             handleIrrecoverableBulkIndexingException((BulkIndexingException) unwrappedException);
-        } else if (unwrappedException instanceof IndexNotFoundException
-            || unwrappedException instanceof AggregationResultUtils.AggregationExtractionException
-            || unwrappedException instanceof TransformConfigReloadingException
-            || unwrappedException instanceof ResourceNotFoundException
-            || unwrappedException instanceof IllegalArgumentException) {
-                failIndexer("task encountered irrecoverable failure: " + e.getMessage());
-            } else if (context.getAndIncrementFailureCount() > context.getNumFailureRetries()) {
-                failIndexer(
-                    "task encountered more than "
-                        + context.getNumFailureRetries()
-                        + " failures; latest failure: "
-                        + ExceptionRootCauseFinder.getDetailedMessage(unwrappedException)
-                );
-            } else {
-                // Since our schedule fires again very quickly after failures it is possible to run into the same failure numerous
-                // times in a row, very quickly. We do not want to spam the audit log with repeated failures, so only record the first one
-                if (e.getMessage().equals(lastAuditedExceptionMessage) == false) {
-                    String message = ExceptionRootCauseFinder.getDetailedMessage(unwrappedException);
+            return;
+        }
 
-                    auditor.warning(
-                        getJobId(),
-                        "Transform encountered an exception: " + message + " Will attempt again at next scheduled trigger."
-                    );
-                    lastAuditedExceptionMessage = message;
-                }
+        // irrecoverable error without special handling
+        if (unwrappedException instanceof ElasticsearchException) {
+            ElasticsearchException elasticsearchException = (ElasticsearchException) unwrappedException;
+            if (ExceptionRootCauseFinder.IRRECOVERABLE_REST_STATUSES.contains(elasticsearchException.status())) {
+                failIndexer("task encountered irrecoverable failure: " + elasticsearchException.getDetailedMessage());
+                return;
             }
+        }
+
+        if (unwrappedException instanceof IllegalArgumentException) {
+            failIndexer("task encountered irrecoverable failure: " + e.getMessage());
+            return;
+        }
+
+        if (context.getAndIncrementFailureCount() > context.getNumFailureRetries()) {
+            failIndexer(
+                "task encountered more than "
+                    + context.getNumFailureRetries()
+                    + " failures; latest failure: "
+                    + ExceptionRootCauseFinder.getDetailedMessage(unwrappedException)
+            );
+            return;
+        }
+
+        // Since our schedule fires again very quickly after failures it is possible to run into the same failure numerous
+        // times in a row, very quickly. We do not want to spam the audit log with repeated failures, so only record the first one
+        if (e.getMessage().equals(lastAuditedExceptionMessage) == false) {
+            String message = ExceptionRootCauseFinder.getDetailedMessage(unwrappedException);
+
+            auditor.warning(
+                getJobId(),
+                "Transform encountered an exception: " + message + " Will attempt again at next scheduled trigger."
+            );
+            lastAuditedExceptionMessage = message;
+        }
     }
 
     /**
@@ -901,8 +916,12 @@ public abstract class TransformIndexer extends AsyncTwoPhaseIndexer<TransformInd
         return RunState.PARTIAL_RUN_IDENTIFY_CHANGES;
     }
 
-    static class TransformConfigReloadingException extends ElasticsearchException {
-        TransformConfigReloadingException(String msg, Throwable cause, Object... args) {
+    /**
+     * Thrown when the transform configuration disappeared permanently.
+     * (not if reloading failed due to an intermittent problem)
+     */
+    static class TransformConfigLostOnReloadException extends ResourceNotFoundException {
+        TransformConfigLostOnReloadException(String msg, Throwable cause, Object... args) {
             super(msg, cause, args);
         }
     }

+ 29 - 6
x-pack/plugin/transform/src/main/java/org/elasticsearch/xpack/transform/utils/ExceptionRootCauseFinder.java

@@ -7,19 +7,37 @@
 package org.elasticsearch.xpack.transform.utils;
 
 import org.elasticsearch.ElasticsearchException;
-import org.elasticsearch.ResourceNotFoundException;
 import org.elasticsearch.action.bulk.BulkItemResponse;
 import org.elasticsearch.action.search.SearchPhaseExecutionException;
 import org.elasticsearch.action.search.ShardSearchFailure;
-import org.elasticsearch.index.mapper.MapperParsingException;
+import org.elasticsearch.rest.RestStatus;
 
+import java.util.Arrays;
 import java.util.Collection;
+import java.util.HashSet;
+import java.util.Set;
 
 /**
  * Set of static utils to find the cause of a search exception.
  */
 public final class ExceptionRootCauseFinder {
 
+    /**
+     * List of rest statuses that we consider irrecoverable
+     */
+    public static final Set<RestStatus> IRRECOVERABLE_REST_STATUSES = new HashSet<>(
+        Arrays.asList(
+            RestStatus.GONE,
+            RestStatus.NOT_IMPLEMENTED,
+            RestStatus.NOT_FOUND,
+            RestStatus.BAD_REQUEST,
+            RestStatus.UNAUTHORIZED,
+            RestStatus.FORBIDDEN,
+            RestStatus.METHOD_NOT_ALLOWED,
+            RestStatus.NOT_ACCEPTABLE
+        )
+    );
+
     /**
      * Unwrap the exception stack and return the most likely cause.
      *
@@ -61,17 +79,22 @@ public final class ExceptionRootCauseFinder {
     /**
      * Return the first irrecoverableException from a collection of bulk responses if there are any.
      *
-     * @param failures a collection of bulk item responses
+     * @param failures a collection of bulk item responses with failures
      * @return The first exception considered irrecoverable if there are any, null if no irrecoverable exception found
      */
     public static Throwable getFirstIrrecoverableExceptionFromBulkResponses(Collection<BulkItemResponse> failures) {
         for (BulkItemResponse failure : failures) {
             Throwable unwrappedThrowable = org.elasticsearch.ExceptionsHelper.unwrapCause(failure.getFailure().getCause());
-            if (unwrappedThrowable instanceof MapperParsingException
-                || unwrappedThrowable instanceof IllegalArgumentException
-                || unwrappedThrowable instanceof ResourceNotFoundException) {
+            if (unwrappedThrowable instanceof IllegalArgumentException) {
                 return unwrappedThrowable;
             }
+
+            if (unwrappedThrowable instanceof ElasticsearchException) {
+                ElasticsearchException elasticsearchException = (ElasticsearchException) unwrappedThrowable;
+                if (IRRECOVERABLE_REST_STATUSES.contains(elasticsearchException.status())) {
+                    return elasticsearchException;
+                }
+            }
         }
 
         return null;

+ 151 - 0
x-pack/plugin/transform/src/test/java/org/elasticsearch/xpack/transform/utils/ExceptionRootCauseFinderTests.java

@@ -0,0 +1,151 @@
+/*
+ * Copyright Elasticsearch B.V. and/or licensed to Elasticsearch B.V. under one
+ * or more contributor license agreements. Licensed under the Elastic License;
+ * you may not use this file except in compliance with the Elastic License.
+ */
+
+package org.elasticsearch.xpack.transform.utils;
+
+import org.elasticsearch.ElasticsearchSecurityException;
+import org.elasticsearch.ResourceNotFoundException;
+import org.elasticsearch.action.DocWriteRequest.OpType;
+import org.elasticsearch.action.bulk.BulkItemResponse;
+import org.elasticsearch.common.util.concurrent.EsRejectedExecutionException;
+import org.elasticsearch.index.mapper.MapperParsingException;
+import org.elasticsearch.index.shard.ShardId;
+import org.elasticsearch.index.translog.TranslogException;
+import org.elasticsearch.rest.RestStatus;
+import org.elasticsearch.test.ESTestCase;
+
+import java.util.Collection;
+import java.util.HashMap;
+import java.util.Map;
+
+public class ExceptionRootCauseFinderTests extends ESTestCase {
+    public void testFetFirstIrrecoverableExceptionFromBulkResponses() {
+        Map<Integer, BulkItemResponse> bulkItemResponses = new HashMap<>();
+
+        int id = 1;
+        // 1
+        bulkItemResponses.put(
+            id,
+            new BulkItemResponse(
+                id++,
+                OpType.INDEX,
+                new BulkItemResponse.Failure("the_index", "id", new MapperParsingException("mapper parsing error"))
+            )
+        );
+        // 2
+        bulkItemResponses.put(
+            id,
+            new BulkItemResponse(
+                id++,
+                OpType.INDEX,
+                new BulkItemResponse.Failure("the_index", "id", new ResourceNotFoundException("resource not found error"))
+            )
+        );
+        // 3
+        bulkItemResponses.put(
+            id,
+            new BulkItemResponse(
+                id++,
+                OpType.INDEX,
+                new BulkItemResponse.Failure("the_index", "id", new IllegalArgumentException("illegal argument error"))
+            )
+        );
+        // 4 not irrecoverable
+        bulkItemResponses.put(
+            id,
+            new BulkItemResponse(
+                id++,
+                OpType.INDEX,
+                new BulkItemResponse.Failure("the_index", "id", new EsRejectedExecutionException("es rejected execution"))
+            )
+        );
+        // 5 not irrecoverable
+        bulkItemResponses.put(
+            id,
+            new BulkItemResponse(
+                id++,
+                OpType.INDEX,
+                new BulkItemResponse.Failure("the_index", "id", new TranslogException(new ShardId("the_index", "uid", 0), "translog error"))
+            )
+        );
+        // 6
+        bulkItemResponses.put(
+            id,
+            new BulkItemResponse(
+                id++,
+                OpType.INDEX,
+                new BulkItemResponse.Failure(
+                    "the_index",
+                    "id",
+                    new ElasticsearchSecurityException("Authentication required", RestStatus.UNAUTHORIZED)
+                )
+            )
+        );
+        // 7
+        bulkItemResponses.put(
+            id,
+            new BulkItemResponse(
+                id++,
+                OpType.INDEX,
+                new BulkItemResponse.Failure(
+                    "the_index",
+                    "id",
+                    new ElasticsearchSecurityException("current license is non-compliant for [transform]", RestStatus.FORBIDDEN)
+                )
+            )
+        );
+        // 8 not irrecoverable
+        bulkItemResponses.put(
+            id,
+            new BulkItemResponse(
+                id++,
+                OpType.INDEX,
+                new BulkItemResponse.Failure(
+                    "the_index",
+                    "id",
+                    new ElasticsearchSecurityException("overloaded, to many requests", RestStatus.TOO_MANY_REQUESTS)
+                )
+            )
+        );
+        // 9 not irrecoverable
+        bulkItemResponses.put(
+            id,
+            new BulkItemResponse(
+                id++,
+                OpType.INDEX,
+                new BulkItemResponse.Failure(
+                    "the_index",
+                    "id",
+                    new ElasticsearchSecurityException("internal error", RestStatus.INTERNAL_SERVER_ERROR)
+                )
+            )
+        );
+
+        assertFirstException(bulkItemResponses.values(), MapperParsingException.class, "mapper parsing error");
+        bulkItemResponses.remove(1);
+        assertFirstException(bulkItemResponses.values(), ResourceNotFoundException.class, "resource not found error");
+        bulkItemResponses.remove(2);
+        assertFirstException(bulkItemResponses.values(), IllegalArgumentException.class, "illegal argument error");
+        bulkItemResponses.remove(3);
+        assertFirstException(bulkItemResponses.values(), ElasticsearchSecurityException.class, "Authentication required");
+        bulkItemResponses.remove(6);
+        assertFirstException(
+            bulkItemResponses.values(),
+            ElasticsearchSecurityException.class,
+            "current license is non-compliant for [transform]"
+        );
+        bulkItemResponses.remove(7);
+
+        assertNull(ExceptionRootCauseFinder.getFirstIrrecoverableExceptionFromBulkResponses(bulkItemResponses.values()));
+    }
+
+    private static void assertFirstException(Collection<BulkItemResponse> bulkItemResponses, Class<?> expectedClass, String message) {
+        Throwable t = ExceptionRootCauseFinder.getFirstIrrecoverableExceptionFromBulkResponses(bulkItemResponses);
+        assertNotNull(t);
+        assertEquals(t.getClass(), expectedClass);
+        assertEquals(t.getMessage(), message);
+    }
+}