浏览代码

[ML] only retry persistence failures when the failure is intermittent and stop retrying when analytics job is stopping (#53725)

This fixes two issues:


- Results persister would retry actions even if they are not intermittent. An example of an persistent failure is a doc mapping problem.
- Data frame analytics would continue to retry to persist results even after the job is stopped.

closes https://github.com/elastic/elasticsearch/issues/53687
Benjamin Trent 5 年之前
父节点
当前提交
783df3f961

+ 3 - 2
x-pack/plugin/ml/src/main/java/org/elasticsearch/xpack/ml/dataframe/process/AnalyticsResultProcessor.java

@@ -108,6 +108,7 @@ public class AnalyticsResultProcessor {
     }
 
     public void cancel() {
+        dataFrameRowsJoiner.cancel();
         isCancelled = true;
     }
 
@@ -265,12 +266,12 @@ public class AnalyticsResultProcessor {
                 new ToXContent.MapParams(Collections.singletonMap(ToXContentParams.FOR_INTERNAL_STORAGE, "true")),
                 WriteRequest.RefreshPolicy.IMMEDIATE,
                 docIdSupplier.apply(analytics.getId()),
-                () -> true,
+                () -> isCancelled == false,
                 errorMsg -> auditor.error(analytics.getId(),
                     "failed to persist result with id [" + docIdSupplier.apply(analytics.getId()) + "]; " + errorMsg)
             );
         } catch (IOException ioe) {
-            LOGGER.error(() -> new ParameterizedMessage("[{}] Failed indexing stats result", analytics.getId()), ioe);
+            LOGGER.error(() -> new ParameterizedMessage("[{}] Failed serializing stats result", analytics.getId()), ioe);
         } catch (Exception e) {
             LOGGER.error(() -> new ParameterizedMessage("[{}] Failed indexing stats result", analytics.getId()), e);
         }

+ 11 - 3
x-pack/plugin/ml/src/main/java/org/elasticsearch/xpack/ml/dataframe/process/DataFrameRowsJoiner.java

@@ -40,9 +40,9 @@ class DataFrameRowsJoiner implements AutoCloseable {
     private final Iterator<DataFrameDataExtractor.Row> dataFrameRowsIterator;
     private LinkedList<RowResults> currentResults;
     private volatile String failure;
+    private volatile boolean isCancelled;
 
-    DataFrameRowsJoiner(String analyticsId, DataFrameDataExtractor dataExtractor,
-                        ResultsPersisterService resultsPersisterService) {
+    DataFrameRowsJoiner(String analyticsId, DataFrameDataExtractor dataExtractor, ResultsPersisterService resultsPersisterService) {
         this.analyticsId = Objects.requireNonNull(analyticsId);
         this.dataExtractor = Objects.requireNonNull(dataExtractor);
         this.resultsPersisterService = Objects.requireNonNull(resultsPersisterService);
@@ -70,6 +70,10 @@ class DataFrameRowsJoiner implements AutoCloseable {
         }
     }
 
+    void cancel() {
+        isCancelled = true;
+    }
+
     private void addResultAndJoinIfEndOfBatch(RowResults rowResults) {
         currentResults.add(rowResults);
         if (currentResults.size() == RESULTS_BATCH_SIZE) {
@@ -87,7 +91,11 @@ class DataFrameRowsJoiner implements AutoCloseable {
         }
         if (bulkRequest.numberOfActions() > 0) {
             resultsPersisterService.bulkIndexWithHeadersWithRetry(
-                dataExtractor.getHeaders(), bulkRequest, analyticsId, () -> true, errorMsg -> {});
+                dataExtractor.getHeaders(),
+                bulkRequest,
+                analyticsId,
+                () -> isCancelled == false,
+                errorMsg -> {});
         }
         currentResults = new LinkedList<>();
     }

+ 51 - 2
x-pack/plugin/ml/src/main/java/org/elasticsearch/xpack/ml/utils/persistence/ResultsPersisterService.java

@@ -9,6 +9,7 @@ import org.apache.logging.log4j.LogManager;
 import org.apache.logging.log4j.Logger;
 import org.apache.logging.log4j.message.ParameterizedMessage;
 import org.elasticsearch.ElasticsearchException;
+import org.elasticsearch.ExceptionsHelper;
 import org.elasticsearch.action.bulk.BulkAction;
 import org.elasticsearch.action.bulk.BulkItemResponse;
 import org.elasticsearch.action.bulk.BulkRequest;
@@ -33,6 +34,8 @@ import org.elasticsearch.xpack.core.ClientHelper;
 import java.io.IOException;
 import java.time.Duration;
 import java.util.Arrays;
+import java.util.Collections;
+import java.util.HashSet;
 import java.util.Map;
 import java.util.Random;
 import java.util.Set;
@@ -42,6 +45,22 @@ import java.util.function.Supplier;
 import java.util.stream.Collectors;
 
 public class ResultsPersisterService {
+    /**
+     * List of rest statuses that we consider irrecoverable
+     */
+    public static final Set<RestStatus> IRRECOVERABLE_REST_STATUSES = Collections.unmodifiableSet(new HashSet<>(
+        Arrays.asList(
+            RestStatus.GONE,
+            RestStatus.NOT_IMPLEMENTED,
+            RestStatus.NOT_FOUND,
+            RestStatus.BAD_REQUEST,
+            RestStatus.UNAUTHORIZED,
+            RestStatus.FORBIDDEN,
+            RestStatus.METHOD_NOT_ALLOWED,
+            RestStatus.NOT_ACCEPTABLE
+        )
+    ));
+
     private static final Logger LOGGER = LogManager.getLogger(ResultsPersisterService.class);
 
     public static final Setting<Integer> PERSIST_RESULTS_MAX_RETRIES = Setting.intSetting(
@@ -124,9 +143,23 @@ public class ResultsPersisterService {
             if (bulkResponse.hasFailures() == false) {
                 return bulkResponse;
             }
-
+            for (BulkItemResponse itemResponse : bulkResponse.getItems()) {
+                if (itemResponse.isFailed()) {
+                    if (isIrrecoverable(itemResponse.getFailure().getCause())) {
+                        Throwable unwrappedParticular = ExceptionsHelper.unwrapCause(itemResponse.getFailure().getCause());
+                        LOGGER.warn(new ParameterizedMessage(
+                            "[{}] experienced failure that cannot be automatically retried. Bulk failure message [{}]",
+                                jobId,
+                                bulkResponse.buildFailureMessage()),
+                            unwrappedParticular);
+                        throw new ElasticsearchException(
+                            "{} experienced failure that cannot be automatically retried. See logs for bulk failures",
+                            unwrappedParticular,
+                            jobId);
+                    }
+                }
+            }
             retryContext.nextIteration("index", bulkResponse.buildFailureMessage());
-
             // We should only retry the docs that failed.
             bulkRequest = buildNewRequestFromFailures(bulkRequest, bulkResponse);
         }
@@ -148,12 +181,28 @@ public class ResultsPersisterService {
             } catch (ElasticsearchException e) {
                 LOGGER.warn("[" + jobId + "] Exception while executing search action", e);
                 failureMessage = e.getDetailedMessage();
+                if (isIrrecoverable(e)) {
+                    LOGGER.warn(new ParameterizedMessage("[{}] experienced failure that cannot be automatically retried", jobId), e);
+                    throw new ElasticsearchException("{} experienced failure that cannot be automatically retried", e, jobId);
+                }
             }
 
             retryContext.nextIteration("search", failureMessage);
         }
     }
 
+    /**
+     * @param ex The exception to check
+     * @return true when the failure will persist no matter how many times we retry.
+     */
+    private static boolean isIrrecoverable(Exception ex) {
+        Throwable t = ExceptionsHelper.unwrapCause(ex);
+        if (t instanceof ElasticsearchException) {
+            return IRRECOVERABLE_REST_STATUSES.contains(((ElasticsearchException) t).status());
+        }
+        return false;
+    }
+
     /**
      * {@link RetryContext} object handles logic that is executed between consecutive retries of an action.
      *

+ 53 - 2
x-pack/plugin/ml/src/test/java/org/elasticsearch/xpack/ml/utils/persistence/ResultsPersisterServiceTests.java

@@ -6,6 +6,7 @@
 package org.elasticsearch.xpack.ml.utils.persistence;
 
 import org.elasticsearch.ElasticsearchException;
+import org.elasticsearch.ElasticsearchStatusException;
 import org.elasticsearch.action.ActionListener;
 import org.elasticsearch.action.DocWriteRequest;
 import org.elasticsearch.action.bulk.BulkAction;
@@ -27,8 +28,10 @@ import org.elasticsearch.cluster.service.MasterService;
 import org.elasticsearch.common.CheckedConsumer;
 import org.elasticsearch.common.settings.ClusterSettings;
 import org.elasticsearch.common.settings.Settings;
-import org.elasticsearch.index.IndexNotFoundException;
+import org.elasticsearch.index.Index;
 import org.elasticsearch.index.shard.ShardId;
+import org.elasticsearch.indices.IndexPrimaryShardNotAllocatedException;
+import org.elasticsearch.rest.RestStatus;
 import org.elasticsearch.test.ESTestCase;
 import org.elasticsearch.threadpool.ThreadPool;
 import org.elasticsearch.xpack.core.ClientHelper;
@@ -131,7 +134,8 @@ public class ResultsPersisterServiceTests extends ESTestCase {
     }
 
     public void testSearchWithRetries_SuccessAfterRetryDueToException() {
-        doThrow(new IndexNotFoundException("my-index")).doAnswer(withResponse(SEARCH_RESPONSE_SUCCESS))
+        doThrow(new IndexPrimaryShardNotAllocatedException(new Index("my-index", "UUID")))
+            .doAnswer(withResponse(SEARCH_RESPONSE_SUCCESS))
             .when(client).execute(eq(SearchAction.INSTANCE), eq(SEARCH_REQUEST), any());
 
         List<String> messages = new ArrayList<>();
@@ -206,6 +210,21 @@ public class ResultsPersisterServiceTests extends ESTestCase {
         verify(client, times(maxRetries + 1)).execute(eq(SearchAction.INSTANCE), eq(SEARCH_REQUEST), any());
     }
 
+    public void testSearchWithRetries_FailureOnIrrecoverableError() {
+        resultsPersisterService.setMaxFailureRetries(5);
+
+        doAnswer(withFailure(new ElasticsearchStatusException("bad search request", RestStatus.BAD_REQUEST)))
+            .when(client).execute(eq(SearchAction.INSTANCE), eq(SEARCH_REQUEST), any());
+
+        ElasticsearchException e =
+            expectThrows(
+                ElasticsearchException.class,
+                () -> resultsPersisterService.searchWithRetry(SEARCH_REQUEST, JOB_ID, () -> true, (s) -> {}));
+        assertThat(e.getMessage(), containsString("experienced failure that cannot be automatically retried"));
+
+        verify(client).execute(eq(SearchAction.INSTANCE), eq(SEARCH_REQUEST), any());
+    }
+
     private static Supplier<Boolean> shouldRetryUntil(int maxRetries) {
         return new Supplier<>() {
             int retries = 0;
@@ -240,6 +259,29 @@ public class ResultsPersisterServiceTests extends ESTestCase {
         assertThat(lastMessage.get(), containsString("failed to index after [1] attempts. Will attempt again in"));
     }
 
+    public void testBulkRequestChangeOnIrrecoverableFailures() {
+        int maxFailureRetries = 10;
+        resultsPersisterService.setMaxFailureRetries(maxFailureRetries);
+        BulkItemResponse irrecoverable = new BulkItemResponse(
+            2,
+            DocWriteRequest.OpType.INDEX,
+            new BulkItemResponse.Failure("my-index", "fail", new ElasticsearchStatusException("boom", RestStatus.BAD_REQUEST)));
+        doAnswerWithResponses(
+            new BulkResponse(new BulkItemResponse[]{irrecoverable, BULK_ITEM_RESPONSE_SUCCESS}, 0L),
+            new BulkResponse(new BulkItemResponse[0], 0L))
+            .when(client).execute(eq(BulkAction.INSTANCE), any(), any());
+
+        BulkRequest bulkRequest = new BulkRequest();
+        bulkRequest.add(INDEX_REQUEST_FAILURE);
+        bulkRequest.add(INDEX_REQUEST_SUCCESS);
+
+        ElasticsearchException ex = expectThrows(ElasticsearchException.class,
+            () -> resultsPersisterService.bulkIndexWithRetry(bulkRequest, JOB_ID, () -> true, (s)->{}));
+
+        verify(client).execute(eq(BulkAction.INSTANCE), any(), any());
+        assertThat(ex.getMessage(), containsString("experienced failure that cannot be automatically retried."));
+    }
+
     public void testBulkRequestDoesNotRetryWhenSupplierIsFalse() {
         doAnswerWithResponses(
                 new BulkResponse(new BulkItemResponse[]{BULK_ITEM_RESPONSE_FAILURE, BULK_ITEM_RESPONSE_SUCCESS}, 0L),
@@ -315,6 +357,15 @@ public class ResultsPersisterServiceTests extends ESTestCase {
         };
     }
 
+    @SuppressWarnings("unchecked")
+    private static <Response> Answer<Response> withFailure(Exception failure) {
+        return invocationOnMock -> {
+            ActionListener<Response> listener = (ActionListener<Response>) invocationOnMock.getArguments()[2];
+            listener.onFailure(failure);
+            return null;
+        };
+    }
+
     private static ResultsPersisterService buildResultsPersisterService(OriginSettingClient client) {
         CheckedConsumer<Integer, InterruptedException> sleeper = millis -> {};
         ThreadPool tp = mock(ThreadPool.class);