Pārlūkot izejas kodu

[ML] Report cause when datafeed extraction encounters error (#66167)

When a datafeed encounters errors extracting data, often
the error is an instance of `SearchPhaseExecutionException`.
In that case the top level error message is `Partial shards failure`
which is not very informative.

This commit refactors a transform util method from
`ExceptionRootCauseFinder`, which unwraps exceptions with special
handling for `SearchPhaseExecutionException`, and makes use of
it from datafeed `ProblemTracker` in order to provide a more
useful error message.
Dimitris Athanasiou 4 gadi atpakaļ
vecāks
revīzija
f8bda89c03

+ 31 - 1
x-pack/plugin/core/src/main/java/org/elasticsearch/xpack/core/ml/utils/ExceptionsHelper.java

@@ -9,6 +9,7 @@ import org.elasticsearch.ElasticsearchException;
 import org.elasticsearch.ElasticsearchStatusException;
 import org.elasticsearch.ResourceAlreadyExistsException;
 import org.elasticsearch.ResourceNotFoundException;
+import org.elasticsearch.action.search.SearchPhaseExecutionException;
 import org.elasticsearch.action.search.ShardSearchFailure;
 import org.elasticsearch.common.ParseField;
 import org.elasticsearch.rest.RestStatus;
@@ -112,7 +113,36 @@ public class ExceptionsHelper {
         return requireNonNull(obj, paramName.getPreferredName());
     }
 
+    /**
+     * @see org.elasticsearch.ExceptionsHelper#unwrapCause(Throwable)
+     */
     public static Throwable unwrapCause(Throwable t) {
-       return org.elasticsearch.ExceptionsHelper.unwrapCause(t);
+        return org.elasticsearch.ExceptionsHelper.unwrapCause(t);
+    }
+
+    /**
+     * Unwrap the exception stack and return the most likely cause.
+     * This method has special handling for {@link SearchPhaseExecutionException}
+     * where it returns the cause of the first shard failure.
+     *
+     * @param t raw Throwable
+     * @return unwrapped throwable if possible
+     */
+    public static Throwable findSearchExceptionRootCause(Throwable t) {
+        // circuit breaking exceptions are at the bottom
+        Throwable unwrappedThrowable = unwrapCause(t);
+
+        if (unwrappedThrowable instanceof SearchPhaseExecutionException) {
+            SearchPhaseExecutionException searchPhaseException = (SearchPhaseExecutionException) unwrappedThrowable;
+            for (ShardSearchFailure shardFailure : searchPhaseException.shardFailures()) {
+                Throwable unwrappedShardFailure = unwrapCause(shardFailure.getCause());
+
+                if (unwrappedShardFailure instanceof ElasticsearchException) {
+                    return unwrappedShardFailure;
+                }
+            }
+        }
+
+        return unwrappedThrowable;
     }
 }

+ 7 - 0
x-pack/plugin/core/src/main/java/org/elasticsearch/xpack/core/transform/utils/ExceptionsHelper.java

@@ -19,4 +19,11 @@ public class ExceptionsHelper {
         }
         return obj;
     }
+
+    /**
+     * @see org.elasticsearch.xpack.core.ml.utils.ExceptionsHelper#findSearchExceptionRootCause(Throwable)
+     */
+    public static Throwable findSearchExceptionRootCause(Throwable t) {
+        return org.elasticsearch.xpack.core.ml.utils.ExceptionsHelper.findSearchExceptionRootCause(t);
+    }
 }

+ 43 - 0
x-pack/plugin/core/src/test/java/org/elasticsearch/xpack/core/ml/utils/ExceptionsHelperTests.java

@@ -0,0 +1,43 @@
+/*
+ * Copyright Elasticsearch B.V. and/or licensed to Elasticsearch B.V. under one
+ * or more contributor license agreements. Licensed under the Elastic License;
+ * you may not use this file except in compliance with the Elastic License.
+ */
+
+package org.elasticsearch.xpack.core.ml.utils;
+
+import org.elasticsearch.ElasticsearchException;
+import org.elasticsearch.action.search.SearchPhaseExecutionException;
+import org.elasticsearch.action.search.ShardSearchFailure;
+import org.elasticsearch.indices.IndexCreationException;
+import org.elasticsearch.test.ESTestCase;
+
+import static org.hamcrest.Matchers.equalTo;
+import static org.hamcrest.Matchers.sameInstance;
+
+public class ExceptionsHelperTests extends ESTestCase {
+
+    public void testFindSearchExceptionRootCause_GivenWrappedSearchPhaseException() {
+        SearchPhaseExecutionException searchPhaseExecutionException = new SearchPhaseExecutionException("test-phase",
+            "partial shards failure", new ShardSearchFailure[] { new ShardSearchFailure(new ElasticsearchException("for the cause!")) });
+
+        Throwable rootCauseException = ExceptionsHelper.findSearchExceptionRootCause(
+            new IndexCreationException("test-index", searchPhaseExecutionException));
+
+        assertThat(rootCauseException.getMessage(), equalTo("for the cause!"));
+    }
+
+    public void testFindSearchExceptionRootCause_GivenRuntimeException() {
+        RuntimeException runtimeException = new RuntimeException("nothing to unwrap here");
+        assertThat(ExceptionsHelper.findSearchExceptionRootCause(runtimeException), sameInstance(runtimeException));
+    }
+
+    public void testFindSearchExceptionRootCause_GivenWrapperException() {
+        RuntimeException runtimeException = new RuntimeException("cause");
+
+        Throwable rootCauseException = ExceptionsHelper.findSearchExceptionRootCause(
+            new IndexCreationException("test-index", runtimeException));
+
+        assertThat(rootCauseException.getMessage(), equalTo("cause"));
+    }
+}

+ 7 - 4
x-pack/plugin/ml/src/main/java/org/elasticsearch/xpack/ml/datafeed/DatafeedJob.java

@@ -7,7 +7,10 @@ package org.elasticsearch.xpack.ml.datafeed;
 
 import org.apache.logging.log4j.LogManager;
 import org.apache.logging.log4j.Logger;
+import org.apache.logging.log4j.message.ParameterizedMessage;
+import org.elasticsearch.ElasticsearchException;
 import org.elasticsearch.ElasticsearchStatusException;
+import org.elasticsearch.ElasticsearchWrapperException;
 import org.elasticsearch.client.Client;
 import org.elasticsearch.common.collect.Tuple;
 import org.elasticsearch.common.io.Streams;
@@ -318,7 +321,7 @@ class DatafeedJob {
             try {
                 extractedData = dataExtractor.next();
             } catch (Exception e) {
-                LOGGER.debug("[" + jobId + "] error while extracting data", e);
+                LOGGER.error(new ParameterizedMessage("[{}] error while extracting data", jobId), e);
                 // When extraction problems are encountered, we do not want to advance time.
                 // Instead, it is preferable to retry the given interval next time an extraction
                 // is triggered.
@@ -350,7 +353,7 @@ class DatafeedJob {
                     if (isIsolated) {
                         return;
                     }
-                    LOGGER.debug("[" + jobId + "] error while posting data", e);
+                LOGGER.error(new ParameterizedMessage("[{}] error while posting data", jobId), e);
 
                     // a conflict exception means the job state is not open any more.
                     // we should therefore stop the datafeed.
@@ -469,7 +472,7 @@ class DatafeedJob {
         return lastEndTimeMs;
     }
 
-    static class AnalysisProblemException extends RuntimeException {
+    static class AnalysisProblemException extends ElasticsearchException implements ElasticsearchWrapperException {
 
         final boolean shouldStop;
         final long nextDelayInMsSinceEpoch;
@@ -481,7 +484,7 @@ class DatafeedJob {
         }
     }
 
-    static class ExtractionProblemException extends RuntimeException {
+    static class ExtractionProblemException extends ElasticsearchException implements ElasticsearchWrapperException {
 
         final long nextDelayInMsSinceEpoch;
 

+ 4 - 4
x-pack/plugin/ml/src/main/java/org/elasticsearch/xpack/ml/datafeed/DatafeedManager.java

@@ -184,12 +184,12 @@ public class DatafeedManager {
                     if (endTime == null) {
                         next = e.nextDelayInMsSinceEpoch;
                     }
-                    holder.problemTracker.reportExtractionProblem(e.getCause().getMessage());
+                    holder.problemTracker.reportExtractionProblem(e);
                 } catch (DatafeedJob.AnalysisProblemException e) {
                     if (endTime == null) {
                         next = e.nextDelayInMsSinceEpoch;
                     }
-                    holder.problemTracker.reportAnalysisProblem(e.getCause().getMessage());
+                    holder.problemTracker.reportAnalysisProblem(e);
                     if (e.shouldStop) {
                         holder.stop("lookback_analysis_error", TimeValue.timeValueSeconds(20), e);
                         return;
@@ -241,10 +241,10 @@ public class DatafeedManager {
                         holder.problemTracker.reportNonEmptyDataCount();
                     } catch (DatafeedJob.ExtractionProblemException e) {
                         nextDelayInMsSinceEpoch = e.nextDelayInMsSinceEpoch;
-                        holder.problemTracker.reportExtractionProblem(e.getCause().getMessage());
+                        holder.problemTracker.reportExtractionProblem(e);
                     } catch (DatafeedJob.AnalysisProblemException e) {
                         nextDelayInMsSinceEpoch = e.nextDelayInMsSinceEpoch;
-                        holder.problemTracker.reportAnalysisProblem(e.getCause().getMessage());
+                        holder.problemTracker.reportAnalysisProblem(e);
                         if (e.shouldStop) {
                             holder.stop("realtime_analysis_error", TimeValue.timeValueSeconds(20), e);
                             return;

+ 7 - 6
x-pack/plugin/ml/src/main/java/org/elasticsearch/xpack/ml/datafeed/ProblemTracker.java

@@ -6,6 +6,7 @@
 package org.elasticsearch.xpack.ml.datafeed;
 
 import org.elasticsearch.xpack.core.ml.job.messages.Messages;
+import org.elasticsearch.xpack.core.ml.utils.ExceptionsHelper;
 import org.elasticsearch.xpack.ml.notifications.AnomalyDetectionAuditor;
 
 import java.util.Objects;
@@ -42,19 +43,19 @@ class ProblemTracker {
     /**
      * Reports as analysis problem if it is different than the last seen problem
      *
-     * @param problemMessage the problem message
+     * @param error the exception
      */
-    public void reportAnalysisProblem(String problemMessage) {
-        reportProblem(Messages.JOB_AUDIT_DATAFEED_DATA_ANALYSIS_ERROR, problemMessage);
+    public void reportAnalysisProblem(DatafeedJob.AnalysisProblemException error) {
+        reportProblem(Messages.JOB_AUDIT_DATAFEED_DATA_ANALYSIS_ERROR, ExceptionsHelper.unwrapCause(error).getMessage());
     }
 
     /**
      * Reports as extraction problem if it is different than the last seen problem
      *
-     * @param problemMessage the problem message
+     * @param error the exception
      */
-    public void reportExtractionProblem(String problemMessage) {
-        reportProblem(Messages.JOB_AUDIT_DATAFEED_DATA_EXTRACTION_ERROR, problemMessage);
+    public void reportExtractionProblem(DatafeedJob.ExtractionProblemException error) {
+        reportProblem(Messages.JOB_AUDIT_DATAFEED_DATA_EXTRACTION_ERROR, ExceptionsHelper.findSearchExceptionRootCause(error).getMessage());
     }
 
     /**

+ 44 - 11
x-pack/plugin/ml/src/test/java/org/elasticsearch/xpack/ml/datafeed/ProblemTrackerTests.java

@@ -5,6 +5,10 @@
  */
 package org.elasticsearch.xpack.ml.datafeed;
 
+import org.elasticsearch.ElasticsearchException;
+import org.elasticsearch.ElasticsearchWrapperException;
+import org.elasticsearch.action.search.SearchPhaseExecutionException;
+import org.elasticsearch.action.search.ShardSearchFailure;
 import org.elasticsearch.test.ESTestCase;
 import org.elasticsearch.xpack.ml.notifications.AnomalyDetectionAuditor;
 import org.junit.Before;
@@ -27,33 +31,43 @@ public class ProblemTrackerTests extends ESTestCase {
     }
 
     public void testReportExtractionProblem() {
-        problemTracker.reportExtractionProblem("foo");
+        problemTracker.reportExtractionProblem(createExtractionProblem("top level", "cause"));
 
-        verify(auditor).error("foo", "Datafeed is encountering errors extracting data: foo");
+        verify(auditor).error("foo", "Datafeed is encountering errors extracting data: cause");
+        assertTrue(problemTracker.hasProblems());
+    }
+
+    public void testReportExtractionProblem_GivenSearchPhaseExecutionException() {
+        SearchPhaseExecutionException searchPhaseExecutionException = new SearchPhaseExecutionException("test-phase",
+            "partial shards failure", new ShardSearchFailure[] { new ShardSearchFailure(new ElasticsearchException("for the cause!")) });
+
+        problemTracker.reportExtractionProblem(new DatafeedJob.ExtractionProblemException(0L, searchPhaseExecutionException));
+
+        verify(auditor).error("foo", "Datafeed is encountering errors extracting data: for the cause!");
         assertTrue(problemTracker.hasProblems());
     }
 
     public void testReportAnalysisProblem() {
-        problemTracker.reportAnalysisProblem("foo");
+        problemTracker.reportAnalysisProblem(createAnalysisProblem("top level", "cause"));
 
-        verify(auditor).error("foo", "Datafeed is encountering errors submitting data for analysis: foo");
+        verify(auditor).error("foo", "Datafeed is encountering errors submitting data for analysis: cause");
         assertTrue(problemTracker.hasProblems());
     }
 
     public void testReportProblem_GivenSameProblemTwice() {
-        problemTracker.reportExtractionProblem("foo");
-        problemTracker.reportAnalysisProblem("foo");
+        problemTracker.reportExtractionProblem(createExtractionProblem("top level", "cause"));
+        problemTracker.reportAnalysisProblem(createAnalysisProblem("top level", "cause"));
 
-        verify(auditor, times(1)).error("foo", "Datafeed is encountering errors extracting data: foo");
+        verify(auditor, times(1)).error("foo", "Datafeed is encountering errors extracting data: cause");
         assertTrue(problemTracker.hasProblems());
     }
 
     public void testReportProblem_GivenSameProblemAfterFinishReport() {
-        problemTracker.reportExtractionProblem("foo");
+        problemTracker.reportExtractionProblem(createExtractionProblem("top level", "cause"));
         problemTracker.finishReport();
-        problemTracker.reportExtractionProblem("foo");
+        problemTracker.reportExtractionProblem(createExtractionProblem("top level", "cause"));
 
-        verify(auditor, times(1)).error("foo", "Datafeed is encountering errors extracting data: foo");
+        verify(auditor, times(1)).error("foo", "Datafeed is encountering errors extracting data: cause");
         assertTrue(problemTracker.hasProblems());
     }
 
@@ -108,7 +122,7 @@ public class ProblemTrackerTests extends ESTestCase {
     }
 
     public void testFinishReport_GivenRecovery() {
-        problemTracker.reportExtractionProblem("bar");
+        problemTracker.reportExtractionProblem(createExtractionProblem("top level", "bar"));
         problemTracker.finishReport();
         problemTracker.finishReport();
 
@@ -116,4 +130,23 @@ public class ProblemTrackerTests extends ESTestCase {
         verify(auditor).info("foo", "Datafeed has recovered data extraction and analysis");
         assertFalse(problemTracker.hasProblems());
     }
+
+    private static DatafeedJob.ExtractionProblemException createExtractionProblem(String error, String cause) {
+        Exception causeException = new RuntimeException(cause);
+        Exception wrappedException = new TestWrappedException(error, causeException);
+        return new DatafeedJob.ExtractionProblemException(0L, wrappedException);
+    }
+
+    private static DatafeedJob.AnalysisProblemException createAnalysisProblem(String error, String cause) {
+        Exception causeException = new RuntimeException(cause);
+        Exception wrappedException = new TestWrappedException(error, causeException);
+        return new DatafeedJob.AnalysisProblemException(0L, false, wrappedException);
+    }
+
+    private static class TestWrappedException extends RuntimeException implements ElasticsearchWrapperException {
+
+        TestWrappedException(String message, Throwable cause) {
+            super(message, cause);
+        }
+    }
 }

+ 1 - 1
x-pack/plugin/transform/src/main/java/org/elasticsearch/xpack/transform/transforms/TransformIndexer.java

@@ -631,7 +631,7 @@ public abstract class TransformIndexer extends AsyncTwoPhaseIndexer<TransformInd
 
     synchronized void handleFailure(Exception e) {
         logger.warn(new ParameterizedMessage("[{}] transform encountered an exception: ", getJobId()), e);
-        Throwable unwrappedException = ExceptionRootCauseFinder.getRootCauseException(e);
+        Throwable unwrappedException = ExceptionsHelper.findSearchExceptionRootCause(e);
 
         if (unwrappedException instanceof CircuitBreakingException) {
             handleCircuitBreakingException((CircuitBreakingException) unwrappedException);

+ 0 - 26
x-pack/plugin/transform/src/main/java/org/elasticsearch/xpack/transform/utils/ExceptionRootCauseFinder.java

@@ -8,8 +8,6 @@ package org.elasticsearch.xpack.transform.utils;
 
 import org.elasticsearch.ElasticsearchException;
 import org.elasticsearch.action.bulk.BulkItemResponse;
-import org.elasticsearch.action.search.SearchPhaseExecutionException;
-import org.elasticsearch.action.search.ShardSearchFailure;
 import org.elasticsearch.rest.RestStatus;
 
 import java.util.Arrays;
@@ -38,30 +36,6 @@ public final class ExceptionRootCauseFinder {
         )
     );
 
-    /**
-     * Unwrap the exception stack and return the most likely cause.
-     *
-     * @param t raw Throwable
-     * @return unwrapped throwable if possible
-     */
-    public static Throwable getRootCauseException(Throwable t) {
-        // circuit breaking exceptions are at the bottom
-        Throwable unwrappedThrowable = org.elasticsearch.ExceptionsHelper.unwrapCause(t);
-
-        if (unwrappedThrowable instanceof SearchPhaseExecutionException) {
-            SearchPhaseExecutionException searchPhaseException = (SearchPhaseExecutionException) unwrappedThrowable;
-            for (ShardSearchFailure shardFailure : searchPhaseException.shardFailures()) {
-                Throwable unwrappedShardFailure = org.elasticsearch.ExceptionsHelper.unwrapCause(shardFailure.getCause());
-
-                if (unwrappedShardFailure instanceof ElasticsearchException) {
-                    return unwrappedShardFailure;
-                }
-            }
-        }
-
-        return t;
-    }
-
     /**
      * Return the best error message possible given a already unwrapped exception.
      *

+ 0 - 16
x-pack/plugin/transform/src/test/java/org/elasticsearch/xpack/transform/utils/ExceptionRootCauseFinderTests.java

@@ -6,18 +6,14 @@
 
 package org.elasticsearch.xpack.transform.utils;
 
-import org.elasticsearch.ElasticsearchException;
 import org.elasticsearch.ElasticsearchSecurityException;
 import org.elasticsearch.ResourceNotFoundException;
 import org.elasticsearch.action.DocWriteRequest.OpType;
 import org.elasticsearch.action.bulk.BulkItemResponse;
-import org.elasticsearch.action.search.SearchPhaseExecutionException;
-import org.elasticsearch.action.search.ShardSearchFailure;
 import org.elasticsearch.common.util.concurrent.EsRejectedExecutionException;
 import org.elasticsearch.index.mapper.MapperParsingException;
 import org.elasticsearch.index.shard.ShardId;
 import org.elasticsearch.index.translog.TranslogException;
-import org.elasticsearch.indices.IndexCreationException;
 import org.elasticsearch.rest.RestStatus;
 import org.elasticsearch.test.ESTestCase;
 
@@ -25,8 +21,6 @@ import java.util.Collection;
 import java.util.HashMap;
 import java.util.Map;
 
-import static org.hamcrest.Matchers.equalTo;
-
 public class ExceptionRootCauseFinderTests extends ESTestCase {
 
     public void testFetFirstIrrecoverableExceptionFromBulkResponses() {
@@ -149,16 +143,6 @@ public class ExceptionRootCauseFinderTests extends ESTestCase {
         assertNull(ExceptionRootCauseFinder.getFirstIrrecoverableExceptionFromBulkResponses(bulkItemResponses.values()));
     }
 
-    public void testGetRootCauseException_GivenWrappedSearchPhaseException() {
-        SearchPhaseExecutionException searchPhaseExecutionException = new SearchPhaseExecutionException("test-phase",
-            "partial shards failure", new ShardSearchFailure[] { new ShardSearchFailure(new ElasticsearchException("for the cause!")) });
-
-        Throwable rootCauseException = ExceptionRootCauseFinder.getRootCauseException(
-            new IndexCreationException("test-index", searchPhaseExecutionException));
-
-        assertThat(rootCauseException.getMessage(), equalTo("for the cause!"));
-    }
-
     private static void assertFirstException(Collection<BulkItemResponse> bulkItemResponses, Class<?> expectedClass, String message) {
         Throwable t = ExceptionRootCauseFinder.getFirstIrrecoverableExceptionFromBulkResponses(bulkItemResponses);
         assertNotNull(t);