Bladeren bron

Retry on ClusterBlockException on transform destination index (#118194) (#118581)

* Retry on ClusterBlockException on transform destination index

* Update docs/changelog/118194.yaml

* Cleaning up tests

* Fixing tests

---------

Co-authored-by: Elastic Machine <elasticmachine@users.noreply.github.com>
Dan Rubinstein 10 maanden geleden
bovenliggende
commit
53ba1f5d0b

+ 5 - 0
docs/changelog/118194.yaml

@@ -0,0 +1,5 @@
+pr: 118194
+summary: Retry on `ClusterBlockException` on transform destination index
+area: Machine Learning
+type: enhancement
+issues: []

+ 43 - 2
x-pack/plugin/transform/src/main/java/org/elasticsearch/xpack/transform/transforms/TransformFailureHandler.java

@@ -169,7 +169,14 @@ class TransformFailureHandler {
      * @param numFailureRetries the number of configured retries
      */
     private void handleBulkIndexingException(BulkIndexingException bulkIndexingException, boolean unattended, int numFailureRetries) {
-        if (unattended == false && bulkIndexingException.isIrrecoverable()) {
+        if (bulkIndexingException.getCause() instanceof ClusterBlockException) {
+            retryWithoutIncrementingFailureCount(
+                bulkIndexingException,
+                bulkIndexingException.getDetailedMessage(),
+                unattended,
+                numFailureRetries
+            );
+        } else if (unattended == false && bulkIndexingException.isIrrecoverable()) {
             String message = TransformMessages.getMessage(
                 TransformMessages.LOG_TRANSFORM_PIVOT_IRRECOVERABLE_BULK_INDEXING_ERROR,
                 bulkIndexingException.getDetailedMessage()
@@ -232,12 +239,46 @@ class TransformFailureHandler {
             && unwrappedException.getClass().equals(context.getLastFailure().getClass());
 
         final int failureCount = context.incrementAndGetFailureCount(unwrappedException);
-
         if (unattended == false && numFailureRetries != -1 && failureCount > numFailureRetries) {
             fail(unwrappedException, "task encountered more than " + numFailureRetries + " failures; latest failure: " + message);
             return;
         }
 
+        logRetry(unwrappedException, message, unattended, numFailureRetries, failureCount, repeatedFailure);
+    }
+
+    /**
+     * Terminate failure handling without incrementing the retries used
+     * <p>
+     * This is used when there is an ongoing recoverable issue and we want to retain
+     * retries for any issues that may occur after the issue is resolved
+     *
+     * @param unwrappedException The exception caught
+     * @param message error message to log/audit
+     * @param unattended whether the transform runs in unattended mode
+     * @param numFailureRetries the number of configured retries
+     */
+    private void retryWithoutIncrementingFailureCount(
+        Throwable unwrappedException,
+        String message,
+        boolean unattended,
+        int numFailureRetries
+    ) {
+        // group failures to decide whether to report it below
+        final boolean repeatedFailure = context.getLastFailure() != null
+            && unwrappedException.getClass().equals(context.getLastFailure().getClass());
+
+        logRetry(unwrappedException, message, unattended, numFailureRetries, context.getFailureCount(), repeatedFailure);
+    }
+
+    private void logRetry(
+        Throwable unwrappedException,
+        String message,
+        boolean unattended,
+        int numFailureRetries,
+        int failureCount,
+        boolean repeatedFailure
+    ) {
         // Since our schedule fires again very quickly after failures it is possible to run into the same failure numerous
         // times in a row, very quickly. We do not want to spam the audit log with repeated failures, so only record the first one
         // and if the number of retries is about to exceed

+ 172 - 59
x-pack/plugin/transform/src/test/java/org/elasticsearch/xpack/transform/transforms/TransformFailureHandlerTests.java

@@ -22,6 +22,7 @@ import org.elasticsearch.xpack.core.transform.transforms.SettingsConfig;
 import org.elasticsearch.xpack.core.transform.transforms.TransformTaskState;
 import org.elasticsearch.xpack.transform.notifications.MockTransformAuditor;
 
+import java.util.List;
 import java.util.Map;
 import java.util.Set;
 
@@ -63,9 +64,121 @@ public class TransformFailureHandlerTests extends ESTestCase {
         }
     }
 
-    public void testUnattended() {
+    public void testHandleIndexerFailure_CircuitBreakingExceptionNewPageSizeLessThanMinimumPageSize() {
+        var e = new CircuitBreakingException(randomAlphaOfLength(10), 1, 0, randomFrom(CircuitBreaker.Durability.values()));
+        assertRetryIfUnattendedOtherwiseFail(e);
+    }
+
+    public void testHandleIndexerFailure_CircuitBreakingExceptionNewPageSizeNotLessThanMinimumPageSize() {
+        var e = new CircuitBreakingException(randomAlphaOfLength(10), 1, 1, randomFrom(CircuitBreaker.Durability.values()));
+
+        List.of(true, false).forEach((unattended) -> { assertNoFailureAndContextPageSizeSet(e, unattended, 365); });
+    }
+
+    public void testHandleIndexerFailure_ScriptException() {
+        var e = new ScriptException(
+            randomAlphaOfLength(10),
+            new ArithmeticException(randomAlphaOfLength(10)),
+            singletonList(randomAlphaOfLength(10)),
+            randomAlphaOfLength(10),
+            randomAlphaOfLength(10)
+        );
+        assertRetryIfUnattendedOtherwiseFail(e);
+    }
+
+    public void testHandleIndexerFailure_BulkIndexExceptionWrappingClusterBlockException() {
+        final BulkIndexingException bulkIndexingException = new BulkIndexingException(
+            randomAlphaOfLength(10),
+            new ClusterBlockException(Map.of("test-index", Set.of(MetadataIndexStateService.INDEX_CLOSED_BLOCK))),
+            randomBoolean()
+        );
+
+        List.of(true, false).forEach((unattended) -> { assertRetryFailureCountNotIncremented(bulkIndexingException, unattended); });
+    }
+
+    public void testHandleIndexerFailure_IrrecoverableBulkIndexException() {
+        final BulkIndexingException e = new BulkIndexingException(
+            randomAlphaOfLength(10),
+            new ElasticsearchStatusException(randomAlphaOfLength(10), RestStatus.INTERNAL_SERVER_ERROR),
+            true
+        );
+        assertRetryIfUnattendedOtherwiseFail(e);
+    }
+
+    public void testHandleIndexerFailure_RecoverableBulkIndexException() {
+        final BulkIndexingException bulkIndexingException = new BulkIndexingException(
+            randomAlphaOfLength(10),
+            new ElasticsearchStatusException(randomAlphaOfLength(10), RestStatus.INTERNAL_SERVER_ERROR),
+            false
+        );
+
+        List.of(true, false).forEach((unattended) -> { assertRetry(bulkIndexingException, unattended); });
+    }
+
+    public void testHandleIndexerFailure_ClusterBlockException() {
+        List.of(true, false).forEach((unattended) -> {
+            assertRetry(
+                new ClusterBlockException(Map.of(randomAlphaOfLength(10), Set.of(MetadataIndexStateService.INDEX_CLOSED_BLOCK))),
+                unattended
+            );
+        });
+    }
+
+    public void testHandleIndexerFailure_SearchPhaseExecutionExceptionWithNoShardSearchFailures() {
+        List.of(true, false).forEach((unattended) -> {
+            assertRetry(
+                new SearchPhaseExecutionException(randomAlphaOfLength(10), randomAlphaOfLength(10), ShardSearchFailure.EMPTY_ARRAY),
+                unattended
+            );
+        });
+    }
+
+    public void testHandleIndexerFailure_SearchPhaseExecutionExceptionWithShardSearchFailures() {
+        List.of(true, false).forEach((unattended) -> {
+            assertRetry(
+                new SearchPhaseExecutionException(
+                    randomAlphaOfLength(10),
+                    randomAlphaOfLength(10),
+                    new ShardSearchFailure[] { new ShardSearchFailure(new Exception()) }
+                ),
+                unattended
+            );
+        });
+    }
+
+    public void testHandleIndexerFailure_RecoverableElasticsearchException() {
+        List.of(true, false).forEach((unattended) -> {
+            assertRetry(new ElasticsearchStatusException(randomAlphaOfLength(10), RestStatus.INTERNAL_SERVER_ERROR), unattended);
+        });
+    }
+
+    public void testHandleIndexerFailure_IrrecoverableElasticsearchException() {
+        var e = new ElasticsearchStatusException(randomAlphaOfLength(10), RestStatus.NOT_FOUND);
+        assertRetryIfUnattendedOtherwiseFail(e);
+    }
+
+    public void testHandleIndexerFailure_IllegalArgumentException() {
+        var e = new IllegalArgumentException(randomAlphaOfLength(10));
+        assertRetryIfUnattendedOtherwiseFail(e);
+    }
+
+    public void testHandleIndexerFailure_UnexpectedException() {
+        List.of(true, false).forEach((unattended) -> { assertRetry(new Exception(), unattended); });
+    }
+
+    private void assertRetryIfUnattendedOtherwiseFail(Exception e) {
+        List.of(true, false).forEach((unattended) -> {
+            if (unattended) {
+                assertRetry(e, unattended);
+            } else {
+                assertFailure(e);
+            }
+        });
+    }
+
+    private void assertRetry(Exception e, boolean unattended) {
         String transformId = randomAlphaOfLength(10);
-        SettingsConfig settings = new SettingsConfig.Builder().setUnattended(true).build();
+        SettingsConfig settings = new SettingsConfig.Builder().setNumFailureRetries(2).setUnattended(unattended).build();
 
         MockTransformAuditor auditor = MockTransformAuditor.createMockAuditor();
         MockTransformContextListener contextListener = new MockTransformContextListener();
@@ -74,51 +187,33 @@ public class TransformFailureHandlerTests extends ESTestCase {
 
         TransformFailureHandler handler = new TransformFailureHandler(auditor, context, transformId);
 
-        handler.handleIndexerFailure(
-            new SearchPhaseExecutionException(
-                "query",
-                "Partial shards failure",
-                new ShardSearchFailure[] {
-                    new ShardSearchFailure(new CircuitBreakingException("to much memory", 110, 100, CircuitBreaker.Durability.TRANSIENT)) }
-            ),
-            settings
-        );
+        assertNoFailure(handler, e, contextListener, settings, true);
+        assertNoFailure(handler, e, contextListener, settings, true);
+        if (unattended) {
+            assertNoFailure(handler, e, contextListener, settings, true);
+        } else {
+            // fail after max retry attempts reached
+            assertFailure(handler, e, contextListener, settings, true);
+        }
+    }
 
-        // CBE isn't a failure, but it only affects page size(which we don't test here)
-        assertFalse(contextListener.getFailed());
-        assertEquals(0, contextListener.getFailureCountChangedCounter());
+    private void assertRetryFailureCountNotIncremented(Exception e, boolean unattended) {
+        String transformId = randomAlphaOfLength(10);
+        SettingsConfig settings = new SettingsConfig.Builder().setNumFailureRetries(2).setUnattended(unattended).build();
 
-        assertNoFailure(
-            handler,
-            new SearchPhaseExecutionException(
-                "query",
-                "Partial shards failure",
-                new ShardSearchFailure[] {
-                    new ShardSearchFailure(
-                        new ScriptException(
-                            "runtime error",
-                            new ArithmeticException("/ by zero"),
-                            singletonList("stack"),
-                            "test",
-                            "painless"
-                        )
-                    ) }
-            ),
-            contextListener,
-            settings
-        );
-        assertNoFailure(
-            handler,
-            new ElasticsearchStatusException("something really bad happened", RestStatus.INTERNAL_SERVER_ERROR),
-            contextListener,
-            settings
-        );
-        assertNoFailure(handler, new IllegalArgumentException("expected apples not oranges"), contextListener, settings);
-        assertNoFailure(handler, new RuntimeException("the s*** hit the fan"), contextListener, settings);
-        assertNoFailure(handler, new NullPointerException("NPE"), contextListener, settings);
+        MockTransformAuditor auditor = MockTransformAuditor.createMockAuditor();
+        MockTransformContextListener contextListener = new MockTransformContextListener();
+        TransformContext context = new TransformContext(TransformTaskState.STARTED, "", 0, contextListener);
+        context.setPageSize(500);
+
+        TransformFailureHandler handler = new TransformFailureHandler(auditor, context, transformId);
+
+        assertNoFailure(handler, e, contextListener, settings, false);
+        assertNoFailure(handler, e, contextListener, settings, false);
+        assertNoFailure(handler, e, contextListener, settings, false);
     }
 
-    public void testClusterBlock() {
+    private void assertFailure(Exception e) {
         String transformId = randomAlphaOfLength(10);
         SettingsConfig settings = new SettingsConfig.Builder().setNumFailureRetries(2).build();
 
@@ -129,32 +224,50 @@ public class TransformFailureHandlerTests extends ESTestCase {
 
         TransformFailureHandler handler = new TransformFailureHandler(auditor, context, transformId);
 
-        final ClusterBlockException clusterBlock = new ClusterBlockException(
-            Map.of("test-index", Set.of(MetadataIndexStateService.INDEX_CLOSED_BLOCK))
-        );
+        assertFailure(handler, e, contextListener, settings, false);
+    }
 
-        handler.handleIndexerFailure(clusterBlock, settings);
-        assertFalse(contextListener.getFailed());
-        assertEquals(1, contextListener.getFailureCountChangedCounter());
+    private void assertNoFailure(
+        TransformFailureHandler handler,
+        Exception e,
+        MockTransformContextListener mockTransformContextListener,
+        SettingsConfig settings,
+        boolean failureCountIncremented
+    ) {
+        handler.handleIndexerFailure(e, settings);
+        assertFalse(mockTransformContextListener.getFailed());
+        assertEquals(failureCountIncremented ? 1 : 0, mockTransformContextListener.getFailureCountChangedCounter());
+        mockTransformContextListener.reset();
+    }
 
-        handler.handleIndexerFailure(clusterBlock, settings);
-        assertFalse(contextListener.getFailed());
-        assertEquals(2, contextListener.getFailureCountChangedCounter());
+    private void assertNoFailureAndContextPageSizeSet(Exception e, boolean unattended, int newPageSize) {
+        String transformId = randomAlphaOfLength(10);
+        SettingsConfig settings = new SettingsConfig.Builder().setNumFailureRetries(2).setUnattended(unattended).build();
 
-        handler.handleIndexerFailure(clusterBlock, settings);
-        assertTrue(contextListener.getFailed());
-        assertEquals(3, contextListener.getFailureCountChangedCounter());
+        MockTransformAuditor auditor = MockTransformAuditor.createMockAuditor();
+        MockTransformContextListener contextListener = new MockTransformContextListener();
+        TransformContext context = new TransformContext(TransformTaskState.STARTED, "", 0, contextListener);
+        context.setPageSize(500);
+
+        TransformFailureHandler handler = new TransformFailureHandler(auditor, context, transformId);
+
+        handler.handleIndexerFailure(e, settings);
+        assertFalse(contextListener.getFailed());
+        assertEquals(0, contextListener.getFailureCountChangedCounter());
+        assertEquals(newPageSize, context.getPageSize());
+        contextListener.reset();
     }
 
-    private void assertNoFailure(
+    private void assertFailure(
         TransformFailureHandler handler,
         Exception e,
         MockTransformContextListener mockTransformContextListener,
-        SettingsConfig settings
+        SettingsConfig settings,
+        boolean failureCountChanged
     ) {
         handler.handleIndexerFailure(e, settings);
-        assertFalse(mockTransformContextListener.getFailed());
-        assertEquals(1, mockTransformContextListener.getFailureCountChangedCounter());
+        assertTrue(mockTransformContextListener.getFailed());
+        assertEquals(failureCountChanged ? 1 : 0, mockTransformContextListener.getFailureCountChangedCounter());
         mockTransformContextListener.reset();
     }