Browse Source

[Transform] Auto retry Transform start (#106243)

* [Transform] Auto retry Transform start

Currently, unattended Transforms can fail to start due to failure to
load the Config from its internal index.  This usually happens when a
Transform is created and immediately started by a system.  The error
looks like:

```
Failed to load transform configuration for transform [id]
```

Now, we will automatically retry the startup logic until the Config is
ready.

Some notes:
- We cannot determine if a transform is unattended or not, so at this
  stage we will assume all transforms are unattended.
- The persistent task running the transform will move into the `STARTED`
  state.  Users can stop the persistent task and retry logic using the
  Transform's Stop API.
- While retrying, the Transform will report `Yellow` health in the API
  and `degraded` in Kibana.  The health message will include that the
  transform is automatically retrying and what error it had encountered.
Pat Whelan 1 year ago
parent
commit
f34f5d4bc9

+ 5 - 0
docs/changelog/106243.yaml

@@ -0,0 +1,5 @@
+pr: 106243
+summary: "[Transform] Auto retry Transform start"
+area: "Transform"
+type: bug
+issues: []

+ 32 - 6
x-pack/plugin/core/src/test/java/org/elasticsearch/xpack/core/transform/transforms/TransformConfigTests.java

@@ -14,7 +14,6 @@ import org.elasticsearch.common.io.stream.BytesStreamOutput;
 import org.elasticsearch.common.io.stream.NamedWriteableAwareStreamInput;
 import org.elasticsearch.common.io.stream.StreamInput;
 import org.elasticsearch.common.io.stream.Writeable.Reader;
-import org.elasticsearch.common.util.Maps;
 import org.elasticsearch.common.xcontent.XContentHelper;
 import org.elasticsearch.core.TimeValue;
 import org.elasticsearch.core.Tuple;
@@ -123,6 +122,20 @@ public class TransformConfigTests extends AbstractSerializingTransformTestCase<T
         return randomTransformConfig(id, version, pivotConfig, latestConfig);
     }
 
+    public static TransformConfig randomTransformConfig(String id, TimeValue frequency, TransformConfigVersion version) {
+        PivotConfig pivotConfig;
+        LatestConfig latestConfig;
+        if (randomBoolean()) {
+            pivotConfig = PivotConfigTests.randomPivotConfig();
+            latestConfig = null;
+        } else {
+            pivotConfig = null;
+            latestConfig = LatestConfigTests.randomLatestConfig();
+        }
+
+        return randomTransformConfig(id, frequency, version, pivotConfig, latestConfig);
+    }
+
     public static TransformConfig randomTransformConfigWithSettings(SettingsConfig settingsConfig) {
         PivotConfig pivotConfig;
         LatestConfig latestConfig;
@@ -157,12 +170,28 @@ public class TransformConfigTests extends AbstractSerializingTransformTestCase<T
         TransformConfigVersion version,
         PivotConfig pivotConfig,
         LatestConfig latestConfig
+    ) {
+        return randomTransformConfig(
+            id,
+            randomBoolean() ? null : TimeValue.timeValueMillis(randomIntBetween(1_000, 3_600_000)),
+            version,
+            pivotConfig,
+            latestConfig
+        );
+    }
+
+    public static TransformConfig randomTransformConfig(
+        String id,
+        TimeValue frequency,
+        TransformConfigVersion version,
+        PivotConfig pivotConfig,
+        LatestConfig latestConfig
     ) {
         return new TransformConfig(
             id,
             randomSourceConfig(),
             randomDestConfig(),
-            randomBoolean() ? null : TimeValue.timeValueMillis(randomIntBetween(1_000, 3_600_000)),
+            frequency,
             randomBoolean() ? null : randomSyncConfig(),
             randomHeaders(),
             pivotConfig,
@@ -281,10 +310,7 @@ public class TransformConfigTests extends AbstractSerializingTransformTestCase<T
     }
 
     private static Map<String, String> randomHeaders() {
-        Map<String, String> headers = Maps.newMapWithExpectedSize(1);
-        headers.put("key", "value");
-
-        return headers;
+        return Map.of("key", "value");
     }
 
     public void testDefaultMatchAll() throws IOException {

+ 34 - 0
x-pack/plugin/transform/src/main/java/org/elasticsearch/xpack/transform/transforms/TransformContext.java

@@ -39,6 +39,9 @@ public class TransformContext {
     private final AtomicInteger statePersistenceFailureCount = new AtomicInteger();
     private final AtomicReference<Throwable> lastStatePersistenceFailure = new AtomicReference<>();
     private volatile Instant lastStatePersistenceFailureStartTime;
+    private final AtomicInteger startUpFailureCount = new AtomicInteger();
+    private final AtomicReference<Throwable> lastStartUpFailure = new AtomicReference<>();
+    private volatile Instant startUpFailureTime;
     private volatile Instant changesLastDetectedAt;
     private volatile Instant lastSearchTime;
     private volatile boolean shouldStopAtCheckpoint = false;
@@ -214,6 +217,37 @@ public class TransformContext {
         return lastStatePersistenceFailureStartTime;
     }
 
+    void resetStartUpFailureCount() {
+        startUpFailureCount.set(0);
+        lastStartUpFailure.set(null);
+        startUpFailureTime = null;
+    }
+
+    int getStartUpFailureCount() {
+        return startUpFailureCount.get();
+    }
+
+    Throwable getStartUpFailure() {
+        return lastStartUpFailure.get();
+    }
+
+    int incrementAndGetStartUpFailureCount(Throwable failure) {
+        lastStartUpFailure.set(failure);
+        int newFailureCount = startUpFailureCount.incrementAndGet();
+        if (newFailureCount == 1) {
+            startUpFailureTime = Instant.now();
+        }
+        return newFailureCount;
+    }
+
+    Instant getStartUpFailureTime() {
+        return startUpFailureTime;
+    }
+
+    boolean doesNotHaveFailures() {
+        return getFailureCount() == 0 && getStatePersistenceFailureCount() == 0 && getStartUpFailureCount() == 0;
+    }
+
     void shutdown() {
         taskListener.shutdown();
     }

+ 21 - 3
x-pack/plugin/transform/src/main/java/org/elasticsearch/xpack/transform/transforms/TransformHealthChecker.java

@@ -38,7 +38,8 @@ public final class TransformHealthChecker {
         PRIVILEGES_CHECK_FAILED("Privileges check failed"),
         TRANSFORM_TASK_FAILED("Transform task state is [failed]"),
         TRANSFORM_INDEXER_FAILED("Transform indexer failed"),
-        TRANSFORM_INTERNAL_STATE_UPDATE_FAILED("Task encountered failures updating internal state");
+        TRANSFORM_INTERNAL_STATE_UPDATE_FAILED("Task encountered failures updating internal state"),
+        TRANSFORM_STARTUP_FAILED("Transform task is automatically retrying its startup process");
 
         private final String issue;
 
@@ -88,8 +89,7 @@ public final class TransformHealthChecker {
     public static TransformHealth checkTransform(TransformTask transformTask, @Nullable AuthorizationState authState) {
         // quick check
         if (TransformTaskState.FAILED.equals(transformTask.getState().getTaskState()) == false
-            && transformTask.getContext().getFailureCount() == 0
-            && transformTask.getContext().getStatePersistenceFailureCount() == 0
+            && transformTask.getContext().doesNotHaveFailures()
             && AuthorizationState.isNullOrGreen(authState)) {
             return TransformHealth.GREEN;
         }
@@ -145,6 +145,24 @@ public final class TransformHealthChecker {
             );
         }
 
+        if (transformContext.getStartUpFailureCount() != 0) {
+            if (HealthStatus.RED.equals(maxStatus) == false) {
+                maxStatus = HealthStatus.YELLOW;
+            }
+
+            var lastFailure = transformContext.getStartUpFailure();
+            var lastFailureMessage = lastFailure instanceof ElasticsearchException elasticsearchException
+                ? elasticsearchException.getDetailedMessage()
+                : lastFailure.getMessage();
+            issues.add(
+                IssueType.TRANSFORM_STARTUP_FAILED.newIssue(
+                    lastFailureMessage,
+                    transformContext.getStartUpFailureCount(),
+                    transformContext.getStartUpFailureTime()
+                )
+            );
+        }
+
         return new TransformHealth(maxStatus, issues);
     }
 

+ 66 - 9
x-pack/plugin/transform/src/main/java/org/elasticsearch/xpack/transform/transforms/TransformPersistentTasksExecutor.java

@@ -45,6 +45,7 @@ import org.elasticsearch.xpack.core.transform.transforms.TransformConfig;
 import org.elasticsearch.xpack.core.transform.transforms.TransformState;
 import org.elasticsearch.xpack.core.transform.transforms.TransformStoredDoc;
 import org.elasticsearch.xpack.core.transform.transforms.TransformTaskParams;
+import org.elasticsearch.xpack.core.transform.transforms.TransformTaskState;
 import org.elasticsearch.xpack.core.transform.transforms.persistence.TransformInternalIndexConstants;
 import org.elasticsearch.xpack.transform.Transform;
 import org.elasticsearch.xpack.transform.TransformExtension;
@@ -203,6 +204,7 @@ public class TransformPersistentTasksExecutor extends PersistentTasksExecutor<Tr
 
         final SetOnce<TransformState> stateHolder = new SetOnce<>();
 
+        // <7> log the start result
         ActionListener<StartTransformAction.Response> startTaskListener = ActionListener.wrap(
             response -> logger.info("[{}] successfully completed and scheduled task in node operation", transformId),
             failure -> {
@@ -348,21 +350,18 @@ public class TransformPersistentTasksExecutor extends PersistentTasksExecutor<Tr
         });
 
         // <2> Get the transform config
-        ActionListener<Void> templateCheckListener = ActionListener.wrap(
-            aVoid -> transformServices.getConfigManager().getTransformConfiguration(transformId, getTransformConfigListener),
-            error -> {
-                Throwable cause = ExceptionsHelper.unwrapCause(error);
-                String msg = "Failed to create internal index mappings";
-                markAsFailed(buildTask, error, msg + "[" + cause + "]");
-            }
-        );
+        var templateCheckListener = getTransformConfig(buildTask, params, getTransformConfigListener);
 
         // <1> Check the latest internal index (IMPORTANT: according to _this_ node, which might be newer than master) is installed
         TransformInternalIndex.createLatestVersionedIndexIfRequired(
             clusterService,
             parentTaskClient,
             transformExtension.getTransformInternalIndexAdditionalSettings(),
-            templateCheckListener
+            templateCheckListener.delegateResponse((l, e) -> {
+                Throwable cause = ExceptionsHelper.unwrapCause(e);
+                String msg = "Failed to create internal index mappings";
+                markAsFailed(buildTask, e, msg + "[" + cause + "]");
+            })
         );
     }
 
@@ -401,6 +400,64 @@ public class TransformPersistentTasksExecutor extends PersistentTasksExecutor<Tr
         }
     }
 
+    private ActionListener<Void> getTransformConfig(
+        TransformTask task,
+        TransformTaskParams params,
+        ActionListener<TransformConfig> listener
+    ) {
+        return ActionListener.running(() -> {
+            var transformId = params.getId();
+            // if this call fails for the first time, we are going to retry it indefinitely
+            // register the retry using the TransformScheduler, when the call eventually succeeds, deregister it before returning
+            var scheduler = transformServices.getScheduler();
+            scheduler.registerTransform(
+                params,
+                new TransformRetryableStartUpListener<>(
+                    transformId,
+                    l -> transformServices.getConfigManager().getTransformConfiguration(transformId, l),
+                    ActionListener.runBefore(listener, () -> scheduler.deregisterTransform(transformId)),
+                    retryListener(task),
+                    () -> true, // because we can't determine if this is an unattended transform yet, retry indefinitely
+                    task.getContext()
+                )
+            );
+        });
+    }
+
+    /**
+     * This listener is always called after the first execution of a {@link TransformRetryableStartUpListener}.
+     *
+     * When the result is true, then the first call has failed and will retry. Save the state as Started and unblock the network thread,
+     * notifying the user with a 200 OK (acknowledged).
+     *
+     * When the result is false, then the first call has succeeded, and no further action is required for this listener.
+     */
+    private ActionListener<Boolean> retryListener(TransformTask task) {
+        return ActionListener.wrap(isRetrying -> {
+            if (isRetrying) {
+                var oldState = task.getState();
+                var newState = new TransformState(
+                    TransformTaskState.STARTED,
+                    oldState.getIndexerState(),
+                    oldState.getPosition(),
+                    oldState.getCheckpoint(),
+                    "Retrying transform start.",
+                    oldState.getProgress(),
+                    oldState.getNode(),
+                    oldState.shouldStopAtNextCheckpoint(),
+                    oldState.getAuthState()
+                );
+                task.persistStateToClusterState(
+                    newState,
+                    ActionListener.wrap(
+                        rr -> logger.debug("[{}] marked as retrying in TransformState.", task.getTransformId()),
+                        ee -> logger.atWarn().withThrowable(ee).log("[{}] failed to persist state.", task.getTransformId())
+                    )
+                );
+            }
+        }, e -> markAsFailed(task, e, "Failed to initiate retries for Transform."));
+    }
+
     private void startTask(
         TransformTask buildTask,
         ClientTransformIndexerBuilder indexerBuilder,

+ 102 - 0
x-pack/plugin/transform/src/main/java/org/elasticsearch/xpack/transform/transforms/TransformRetryableStartUpListener.java

@@ -0,0 +1,102 @@
+/*
+ * Copyright Elasticsearch B.V. and/or licensed to Elasticsearch B.V. under one
+ * or more contributor license agreements. Licensed under the Elastic License
+ * 2.0; you may not use this file except in compliance with the Elastic License
+ * 2.0.
+ */
+
+package org.elasticsearch.xpack.transform.transforms;
+
+import org.elasticsearch.action.ActionListener;
+import org.elasticsearch.xpack.transform.transforms.scheduling.TransformScheduler;
+
+import java.util.concurrent.atomic.AtomicBoolean;
+import java.util.function.Consumer;
+import java.util.function.Supplier;
+
+class TransformRetryableStartUpListener<Response> implements TransformScheduler.Listener {
+    private final String transformId;
+    private final Consumer<ActionListener<Response>> action;
+    private final ActionListener<Response> actionListener;
+    private final ActionListener<Boolean> retryScheduledListener;
+    private final Supplier<Boolean> shouldRetry;
+    private final TransformContext context;
+    private final AtomicBoolean isFirstRun;
+    private final AtomicBoolean isRunning;
+
+    /**
+     * @param transformId the transform associated with this listener. All events to this listener must be for the same transformId.
+     * @param action the action this listener will take. When the TransformScheduler invokes {@link #triggered(TransformScheduler.Event)},
+     *               the call is forwarded to this action.
+     * @param actionListener actionListener will be notified via #onResponse when the action succeeds or via #onFailure when retries have
+     *                       stopped. If the Transform Stop API deregisters this class from the Scheduler, this actionListener will *not* be
+     *                       invoked.
+     * @param retryScheduledListener retryScheduledListener will be notified after the first call. If true, another thread has started the
+     *                               retry process. If false, the original call was successful, and no retries will happen.
+     * @param shouldRetry allows an external entity to gracefully stop these retries, invoking the actionListener's #onFailure method.
+     *                    Note that external entities are still required to deregister this listener from the Scheduler.
+     * @param context the transform's context object. This listener will update the StartUpFailureCount information in the context as it
+     *                encounters errors and retries.
+     */
+    TransformRetryableStartUpListener(
+        String transformId,
+        Consumer<ActionListener<Response>> action,
+        ActionListener<Response> actionListener,
+        ActionListener<Boolean> retryScheduledListener,
+        Supplier<Boolean> shouldRetry,
+        TransformContext context
+    ) {
+        this.transformId = transformId;
+        this.action = action;
+        this.actionListener = actionListener;
+        this.retryScheduledListener = retryScheduledListener;
+        this.shouldRetry = shouldRetry;
+        this.context = context;
+        this.isFirstRun = new AtomicBoolean(true);
+        this.isRunning = new AtomicBoolean(true);
+    }
+
+    @Override
+    public void triggered(TransformScheduler.Event event) {
+        if (isRunning.get() && transformId.equals(event.transformId())) {
+            action.accept(ActionListener.wrap(this::actionSucceeded, this::actionFailed));
+        }
+    }
+
+    private void markDone() {
+        if (isRunning.compareAndSet(true, false)) {
+            synchronized (context) {
+                context.resetStartUpFailureCount();
+            }
+        }
+    }
+
+    private void actionSucceeded(Response r) {
+        maybeNotifyRetryListener(false);
+        markDone();
+        actionListener.onResponse(r);
+    }
+
+    private void maybeNotifyRetryListener(boolean response) {
+        if (isFirstRun.compareAndSet(true, false)) {
+            retryScheduledListener.onResponse(response);
+        }
+    }
+
+    private void actionFailed(Exception e) {
+        if (shouldRetry.get()) {
+            maybeNotifyRetryListener(true);
+            recordError(e);
+        } else {
+            maybeNotifyRetryListener(false);
+            markDone();
+            actionListener.onFailure(e);
+        }
+    }
+
+    private void recordError(Exception e) {
+        synchronized (context) {
+            context.incrementAndGetStartUpFailureCount(e);
+        }
+    }
+}

+ 1 - 1
x-pack/plugin/transform/src/main/java/org/elasticsearch/xpack/transform/transforms/TransformTask.java

@@ -71,7 +71,7 @@ public class TransformTask extends AllocatedPersistentTask implements TransformS
     private final SetOnce<ClientTransformIndexer> indexer = new SetOnce<>();
 
     @SuppressWarnings("this-escape")
-    public TransformTask(
+    TransformTask(
         long id,
         String type,
         String action,

+ 67 - 3
x-pack/plugin/transform/src/test/java/org/elasticsearch/xpack/transform/transforms/TransformContextTests.java

@@ -21,9 +21,11 @@ import static org.hamcrest.Matchers.equalTo;
 import static org.hamcrest.Matchers.is;
 import static org.hamcrest.Matchers.notNullValue;
 import static org.hamcrest.Matchers.nullValue;
+import static org.hamcrest.Matchers.sameInstance;
 import static org.mockito.Mockito.mock;
 import static org.mockito.Mockito.times;
 import static org.mockito.Mockito.verify;
+import static org.mockito.Mockito.verifyNoInteractions;
 import static org.mockito.Mockito.verifyNoMoreInteractions;
 
 public class TransformContextTests extends ESTestCase {
@@ -41,19 +43,81 @@ public class TransformContextTests extends ESTestCase {
     }
 
     public void testFailureCount() {
-        TransformContext context = new TransformContext(null, null, 0, listener);
-        assertThat(context.incrementAndGetFailureCount(new RuntimeException("some_exception")), is(equalTo(1)));
+        var context = new TransformContext(null, null, 0, listener);
+
+        var someException = someException();
+        assertThat(context.incrementAndGetFailureCount(someException), is(equalTo(1)));
         assertThat(context.getFailureCount(), is(equalTo(1)));
-        assertThat(context.incrementAndGetFailureCount(new IllegalArgumentException("some_other_exception")), is(equalTo(2)));
+        assertThat(context.getLastFailure(), is(sameInstance(someException)));
+        assertFalse(context.doesNotHaveFailures());
+
+        var someOtherException = someOtherException();
+        assertThat(context.incrementAndGetFailureCount(someOtherException), is(equalTo(2)));
         assertThat(context.getFailureCount(), is(equalTo(2)));
+        assertThat(context.getLastFailure(), is(sameInstance(someOtherException)));
+        assertFalse(context.doesNotHaveFailures());
+
         context.resetReasonAndFailureCounter();
         assertThat(context.getFailureCount(), is(equalTo(0)));
         assertThat(context.getLastFailure(), is(nullValue()));
+        assertTrue(context.doesNotHaveFailures());
 
         // Verify that the listener is notified every time the failure count is incremented or reset
         verify(listener, times(3)).failureCountChanged();
     }
 
+    private Throwable someException() {
+        return new RuntimeException("some_exception");
+    }
+
+    private Throwable someOtherException() {
+        return new IllegalArgumentException("some_other_exception");
+    }
+
+    public void testStatePersistenceFailureCount() {
+        var context = new TransformContext(null, null, 0, listener);
+
+        var someException = someException();
+        assertThat(context.incrementAndGetStatePersistenceFailureCount(someException), is(equalTo(1)));
+        assertThat(context.getStatePersistenceFailureCount(), is(equalTo(1)));
+        assertThat(context.getLastStatePersistenceFailure(), is(sameInstance(someException)));
+        assertFalse(context.doesNotHaveFailures());
+
+        var someOtherException = someOtherException();
+        assertThat(context.incrementAndGetStatePersistenceFailureCount(someOtherException), is(equalTo(2)));
+        assertThat(context.getStatePersistenceFailureCount(), is(equalTo(2)));
+        assertThat(context.getLastStatePersistenceFailure(), is(sameInstance(someOtherException)));
+        assertFalse(context.doesNotHaveFailures());
+
+        context.resetStatePersistenceFailureCount();
+        assertThat(context.getStatePersistenceFailureCount(), is(equalTo(0)));
+        assertThat(context.getLastStatePersistenceFailure(), is(nullValue()));
+        assertTrue(context.doesNotHaveFailures());
+        verifyNoInteractions(listener);
+    }
+
+    public void testStartUpFailureCount() {
+        var context = new TransformContext(null, null, 0, listener);
+
+        var someException = someException();
+        assertThat(context.incrementAndGetStartUpFailureCount(someException), is(equalTo(1)));
+        assertThat(context.getStartUpFailureCount(), is(equalTo(1)));
+        assertThat(context.getStartUpFailure(), is(sameInstance(someException)));
+        assertFalse(context.doesNotHaveFailures());
+
+        var someOtherException = someOtherException();
+        assertThat(context.incrementAndGetStartUpFailureCount(someOtherException), is(equalTo(2)));
+        assertThat(context.getStartUpFailureCount(), is(equalTo(2)));
+        assertThat(context.getStartUpFailure(), is(sameInstance(someOtherException)));
+        assertFalse(context.doesNotHaveFailures());
+
+        context.resetStartUpFailureCount();
+        assertThat(context.getStartUpFailureCount(), is(equalTo(0)));
+        assertThat(context.getStartUpFailure(), is(nullValue()));
+        assertTrue(context.doesNotHaveFailures());
+        verifyNoInteractions(listener);
+    }
+
     public void testCheckpoint() {
         TransformContext context = new TransformContext(null, null, 13, listener);
         assertThat(context.getCheckpoint(), is(equalTo(13L)));

+ 25 - 0
x-pack/plugin/transform/src/test/java/org/elasticsearch/xpack/transform/transforms/TransformHealthCheckerTests.java

@@ -17,6 +17,7 @@ import org.elasticsearch.xpack.core.transform.transforms.TransformTaskState;
 
 import java.time.Instant;
 import java.time.temporal.ChronoUnit;
+import java.util.stream.IntStream;
 
 import static org.hamcrest.Matchers.equalTo;
 import static org.hamcrest.Matchers.greaterThanOrEqualTo;
@@ -101,6 +102,30 @@ public class TransformHealthCheckerTests extends ESTestCase {
         assertThat(TransformHealthChecker.checkTransform(task), equalTo(TransformHealth.GREEN));
     }
 
+    public void testStartUpFailures() {
+        var task = mock(TransformTask.class);
+        var context = createTestContext();
+        var now = getNow();
+
+        withIdStateAndContext(task, randomAlphaOfLength(10), context);
+        assertThat(TransformHealthChecker.checkTransform(task), equalTo(TransformHealth.GREEN));
+
+        context.incrementAndGetStartUpFailureCount(new ElasticsearchException("failed to persist"));
+
+        var health = TransformHealthChecker.checkTransform(task);
+        assertThat(health.getStatus(), equalTo(HealthStatus.YELLOW));
+        assertEquals(1, health.getIssues().size());
+        assertThat(health.getIssues().get(0).getIssue(), equalTo("Transform task is automatically retrying its startup process"));
+        assertThat(health.getIssues().get(0).getFirstOccurrence(), greaterThanOrEqualTo(now));
+        assertThat(health.getIssues().get(0).getFirstOccurrence(), lessThan(Instant.MAX));
+
+        IntStream.range(0, 10).forEach(i -> context.incrementAndGetStartUpFailureCount(new ElasticsearchException("failed to persist")));
+        assertThat("Start up failures should always be yellow regardless of count", health.getStatus(), equalTo(HealthStatus.YELLOW));
+
+        context.resetStartUpFailureCount();
+        assertThat(TransformHealthChecker.checkTransform(task), equalTo(TransformHealth.GREEN));
+    }
+
     private TransformContext createTestContext() {
         return new TransformContext(TransformTaskState.STARTED, "", 0, mock(TransformContext.Listener.class));
     }

+ 151 - 28
x-pack/plugin/transform/src/test/java/org/elasticsearch/xpack/transform/transforms/TransformPersistentTasksExecutorTests.java

@@ -7,6 +7,7 @@
 
 package org.elasticsearch.xpack.transform.transforms;
 
+import org.elasticsearch.action.ActionListener;
 import org.elasticsearch.client.internal.Client;
 import org.elasticsearch.cluster.ClusterName;
 import org.elasticsearch.cluster.ClusterState;
@@ -24,6 +25,7 @@ import org.elasticsearch.cluster.routing.UnassignedInfo;
 import org.elasticsearch.cluster.service.ClusterService;
 import org.elasticsearch.common.settings.ClusterSettings;
 import org.elasticsearch.common.settings.Settings;
+import org.elasticsearch.common.util.concurrent.EsExecutors;
 import org.elasticsearch.core.TimeValue;
 import org.elasticsearch.index.Index;
 import org.elasticsearch.index.IndexVersion;
@@ -31,33 +33,74 @@ import org.elasticsearch.index.shard.ShardId;
 import org.elasticsearch.indices.TestIndexNameExpressionResolver;
 import org.elasticsearch.persistent.PersistentTasksCustomMetadata;
 import org.elasticsearch.persistent.PersistentTasksCustomMetadata.Assignment;
+import org.elasticsearch.tasks.TaskId;
 import org.elasticsearch.tasks.TaskManager;
 import org.elasticsearch.test.ESTestCase;
+import org.elasticsearch.threadpool.TestThreadPool;
 import org.elasticsearch.threadpool.ThreadPool;
+import org.elasticsearch.xpack.core.indexing.IndexerState;
 import org.elasticsearch.xpack.core.transform.TransformConfigVersion;
+import org.elasticsearch.xpack.core.transform.transforms.AuthorizationState;
+import org.elasticsearch.xpack.core.transform.transforms.TransformConfig;
+import org.elasticsearch.xpack.core.transform.transforms.TransformConfigTests;
+import org.elasticsearch.xpack.core.transform.transforms.TransformState;
 import org.elasticsearch.xpack.core.transform.transforms.TransformTaskParams;
+import org.elasticsearch.xpack.core.transform.transforms.TransformTaskState;
 import org.elasticsearch.xpack.core.transform.transforms.persistence.TransformInternalIndexConstants;
 import org.elasticsearch.xpack.transform.DefaultTransformExtension;
 import org.elasticsearch.xpack.transform.Transform;
 import org.elasticsearch.xpack.transform.TransformServices;
 import org.elasticsearch.xpack.transform.checkpoint.TransformCheckpointService;
 import org.elasticsearch.xpack.transform.notifications.TransformAuditor;
-import org.elasticsearch.xpack.transform.persistence.IndexBasedTransformConfigManager;
+import org.elasticsearch.xpack.transform.persistence.InMemoryTransformConfigManager;
+import org.elasticsearch.xpack.transform.persistence.TransformConfigManager;
 import org.elasticsearch.xpack.transform.persistence.TransformInternalIndexTests;
 import org.elasticsearch.xpack.transform.transforms.scheduling.TransformScheduler;
+import org.junit.AfterClass;
+import org.junit.BeforeClass;
 
 import java.time.Clock;
 import java.util.ArrayList;
-import java.util.Collections;
 import java.util.List;
 import java.util.Map;
 import java.util.Set;
+import java.util.concurrent.Executor;
+import java.util.concurrent.ExecutorService;
+import java.util.concurrent.atomic.AtomicBoolean;
 
 import static org.hamcrest.Matchers.equalTo;
+import static org.mockito.ArgumentMatchers.any;
+import static org.mockito.ArgumentMatchers.anyInt;
+import static org.mockito.ArgumentMatchers.argThat;
+import static org.mockito.ArgumentMatchers.isNull;
+import static org.mockito.Mockito.doAnswer;
 import static org.mockito.Mockito.mock;
+import static org.mockito.Mockito.verify;
 import static org.mockito.Mockito.when;
 
 public class TransformPersistentTasksExecutorTests extends ESTestCase {
+    private static ThreadPool threadPool;
+
+    @BeforeClass
+    public static void setUpThreadPool() {
+        threadPool = new TestThreadPool(TransformPersistentTasksExecutorTests.class.getSimpleName()) {
+            @Override
+            public ExecutorService executor(String name) {
+                return EsExecutors.DIRECT_EXECUTOR_SERVICE;
+            }
+
+            @Override
+            public ScheduledCancellable schedule(Runnable command, TimeValue delay, Executor name) {
+                command.run();
+                return null;
+            }
+        };
+    }
+
+    @AfterClass
+    public static void tearDownThreadPool() {
+        terminate(threadPool);
+    }
 
     public void testNodeVersionAssignment() {
         DiscoveryNodes.Builder nodes = buildNodes(false, true, true, true, true);
@@ -262,6 +305,88 @@ public class TransformPersistentTasksExecutorTests extends ESTestCase {
         assertEquals(indexToRemove, result.get(0));
     }
 
+    public void testNodeOperation() {
+        var transformsConfigManager = new InMemoryTransformConfigManager();
+        var transformScheduler = new TransformScheduler(Clock.systemUTC(), threadPool, Settings.EMPTY, TimeValue.ZERO);
+        var taskExecutor = buildTaskExecutor(transformServices(transformsConfigManager, transformScheduler));
+
+        var transformId = "testNodeOperation";
+        var params = mockTaskParams(transformId);
+
+        putTransformConfiguration(transformsConfigManager, transformId);
+        var task = mockTransformTask();
+        taskExecutor.nodeOperation(task, params, mock());
+
+        verify(task).start(isNull(), any());
+    }
+
+    private void putTransformConfiguration(TransformConfigManager configManager, String transformId) {
+        configManager.putTransformConfiguration(
+            TransformConfigTests.randomTransformConfig(transformId, TimeValue.timeValueMillis(1), TransformConfigVersion.CURRENT),
+            ActionListener.<Boolean>noop().delegateResponse((l, e) -> fail(e))
+        );
+    }
+
+    public void testNodeOperationStartupRetry() throws Exception {
+        var failFirstCall = new AtomicBoolean(true);
+        var transformsConfigManager = new InMemoryTransformConfigManager() {
+            @Override
+            public void getTransformConfiguration(String transformId, ActionListener<TransformConfig> resultListener) {
+                if (failFirstCall.compareAndSet(true, false)) {
+                    resultListener.onFailure(new IllegalStateException("Failing first call."));
+                } else {
+                    super.getTransformConfiguration(transformId, resultListener);
+                }
+            }
+        };
+
+        var transformScheduler = new TransformScheduler(Clock.systemUTC(), threadPool, fastRetry(), TimeValue.ZERO);
+        var taskExecutor = buildTaskExecutor(transformServices(transformsConfigManager, transformScheduler));
+
+        var transformId = "testNodeOperationStartupRetry";
+        var params = mockTaskParams(transformId);
+        putTransformConfiguration(transformsConfigManager, transformId);
+
+        var task = mockTransformTask();
+        taskExecutor.nodeOperation(task, params, mock());
+
+        // skip waiting for the scheduler to run the task a second time and just rerun it now
+        transformScheduler.scheduleNow(transformId);
+
+        // verify the retry listener set the state to TransformTaskState.STARTED + IndexerState.STOPPED
+        verify(task).persistStateToClusterState(argThat(state -> {
+            assertThat(TransformTaskState.STARTED, equalTo(state.getTaskState()));
+            assertThat(IndexerState.STOPPED, equalTo(state.getIndexerState()));
+            return true;
+        }), any());
+        verify(task).start(isNull(), any());
+    }
+
+    private Settings fastRetry() {
+        // must be >= [1s]
+        return Settings.builder().put(Transform.SCHEDULER_FREQUENCY.getKey(), TimeValue.timeValueSeconds(1)).build();
+    }
+
+    private TransformTaskParams mockTaskParams(String transformId) {
+        var params = mock(TransformTaskParams.class);
+        when(params.getId()).thenReturn(transformId);
+        when(params.getFrequency()).thenReturn(TimeValue.timeValueSeconds(1));
+        return params;
+    }
+
+    private TransformTask mockTransformTask() {
+        var task = mock(TransformTask.class);
+        when(task.setAuthState(any(AuthorizationState.class))).thenReturn(task);
+        when(task.setNumFailureRetries(anyInt())).thenReturn(task);
+        when(task.getParentTaskId()).thenReturn(TaskId.EMPTY_TASK_ID);
+        when(task.getContext()).thenReturn(mock());
+        doAnswer(a -> fail(a.getArgument(0, Throwable.class))).when(task).fail(any(Throwable.class), any(String.class), any());
+        when(task.getState()).thenReturn(
+            new TransformState(TransformTaskState.STOPPED, IndexerState.STOPPED, null, 0, null, null, null, false, null)
+        );
+        return task;
+    }
+
     private void addIndices(Metadata.Builder metadata, RoutingTable.Builder routingTable) {
         List<String> indices = new ArrayList<>();
         indices.add(TransformInternalIndexConstants.AUDIT_INDEX);
@@ -415,23 +540,20 @@ public class TransformPersistentTasksExecutorTests extends ESTestCase {
         csBuilder.metadata(metadata);
 
         return csBuilder.build();
-
     }
 
     private TransformPersistentTasksExecutor buildTaskExecutor() {
-        ClusterService clusterService = mock(ClusterService.class);
-        Client client = mock(Client.class);
-        TransformAuditor mockAuditor = mock(TransformAuditor.class);
-        IndexBasedTransformConfigManager transformsConfigManager = new IndexBasedTransformConfigManager(
-            clusterService,
-            TestIndexNameExpressionResolver.newInstance(),
-            client,
-            xContentRegistry()
+        var transformServices = transformServices(
+            new InMemoryTransformConfigManager(),
+            new TransformScheduler(Clock.systemUTC(), threadPool, Settings.EMPTY, TimeValue.ZERO)
         );
-        Clock clock = Clock.systemUTC();
-        ThreadPool threadPool = mock(ThreadPool.class);
-        TransformCheckpointService transformCheckpointService = new TransformCheckpointService(
-            clock,
+        return buildTaskExecutor(transformServices);
+    }
+
+    private TransformServices transformServices(TransformConfigManager configManager, TransformScheduler scheduler) {
+        var mockAuditor = mock(TransformAuditor.class);
+        var transformCheckpointService = new TransformCheckpointService(
+            Clock.systemUTC(),
             Settings.EMPTY,
             new ClusterService(
                 Settings.EMPTY,
@@ -439,28 +561,29 @@ public class TransformPersistentTasksExecutorTests extends ESTestCase {
                 null,
                 (TaskManager) null
             ),
-            transformsConfigManager,
+            configManager,
             mockAuditor
         );
-        TransformServices transformServices = new TransformServices(
-            transformsConfigManager,
-            transformCheckpointService,
-            mockAuditor,
-            new TransformScheduler(Clock.systemUTC(), threadPool, Settings.EMPTY, TimeValue.ZERO)
-        );
-
-        ClusterSettings cSettings = new ClusterSettings(Settings.EMPTY, Collections.singleton(Transform.NUM_FAILURE_RETRIES_SETTING));
-        when(clusterService.getClusterSettings()).thenReturn(cSettings);
-        when(clusterService.state()).thenReturn(TransformInternalIndexTests.randomTransformClusterState());
+        return new TransformServices(configManager, transformCheckpointService, mockAuditor, scheduler);
+    }
 
+    private TransformPersistentTasksExecutor buildTaskExecutor(TransformServices transformServices) {
         return new TransformPersistentTasksExecutor(
-            client,
+            mock(Client.class),
             transformServices,
             threadPool,
-            clusterService,
+            clusterService(),
             Settings.EMPTY,
             new DefaultTransformExtension(),
             TestIndexNameExpressionResolver.newInstance()
         );
     }
+
+    private ClusterService clusterService() {
+        var clusterService = mock(ClusterService.class);
+        var cSettings = new ClusterSettings(Settings.EMPTY, Set.of(Transform.NUM_FAILURE_RETRIES_SETTING));
+        when(clusterService.getClusterSettings()).thenReturn(cSettings);
+        when(clusterService.state()).thenReturn(TransformInternalIndexTests.randomTransformClusterState());
+        return clusterService;
+    }
 }

+ 239 - 0
x-pack/plugin/transform/src/test/java/org/elasticsearch/xpack/transform/transforms/TransformRetryableStartUpListenerTests.java

@@ -0,0 +1,239 @@
+/*
+ * Copyright Elasticsearch B.V. and/or licensed to Elasticsearch B.V. under one
+ * or more contributor license agreements. Licensed under the Elastic License
+ * 2.0; you may not use this file except in compliance with the Elastic License
+ * 2.0.
+ */
+
+package org.elasticsearch.xpack.transform.transforms;
+
+import org.elasticsearch.action.ActionListener;
+import org.elasticsearch.test.ESTestCase;
+import org.elasticsearch.xpack.transform.transforms.scheduling.TransformScheduler;
+
+import java.util.concurrent.atomic.AtomicBoolean;
+import java.util.concurrent.atomic.AtomicInteger;
+import java.util.concurrent.atomic.AtomicReference;
+import java.util.function.Consumer;
+
+import static org.mockito.ArgumentMatchers.any;
+import static org.mockito.Mockito.mock;
+import static org.mockito.Mockito.only;
+import static org.mockito.Mockito.times;
+import static org.mockito.Mockito.verify;
+import static org.mockito.Mockito.verifyNoInteractions;
+
+public class TransformRetryableStartUpListenerTests extends ESTestCase {
+    /**
+     * When the action succeeds on the first try
+     * Then we invoked the retryListener with "false" and then invoked the actionListener's onResponse.
+     */
+    public void testFirstRunPasses() {
+        var retryResult = new AtomicReference<Boolean>();
+        var responseResult = new AtomicInteger(0);
+        var context = mock(TransformContext.class);
+
+        var listener = new TransformRetryableStartUpListener<>(
+            "transformId",
+            immediatelyReturn(),
+            responseListener(responseResult),
+            retryListener(retryResult),
+            () -> true,
+            context
+        );
+
+        callThreeTimes("transformId", listener);
+
+        // assert only 1 success and no retries
+        assertEquals("Response Listener should only be called once.", 1, responseResult.get());
+        assertNotNull("Retry Listener should be called.", retryResult.get());
+        assertFalse("Retries should not be scheduled.", retryResult.get());
+        verify(context, only()).resetStartUpFailureCount();
+    }
+
+    private Consumer<ActionListener<Void>> immediatelyReturn() {
+        return l -> l.onResponse(null);
+    }
+
+    private ActionListener<Void> responseListener(AtomicInteger result) {
+        return ActionListener.wrap(r -> {
+            if (result.compareAndSet(0, 1) == false) {
+                fail("Response Listener should only be called at most once for every test.");
+            }
+        }, e -> {
+            if (result.compareAndSet(0, -1) == false) {
+                fail("Response Listener should only be called at most once for every test.");
+            }
+        });
+    }
+
+    private ActionListener<Boolean> retryListener(AtomicReference<Boolean> result) {
+        return ActionListener.wrap(result::set, e -> fail("Retry Listener is never expected to fail."));
+    }
+
+    private void callThreeTimes(String transformId, TransformRetryableStartUpListener<?> listener) {
+        listener.triggered(event(transformId));
+        listener.triggered(event(transformId));
+        listener.triggered(event(transformId));
+    }
+
+    private TransformScheduler.Event event(String transformId) {
+        return new TransformScheduler.Event(transformId, System.currentTimeMillis(), System.currentTimeMillis());
+    }
+
+    /**
+     * When the action fails once then succeeds on the second try
+     * Then we invoked the retryListener with "true" and then invoked the actionListener's onResponse.
+     */
+    public void testFirstRunFails() {
+        var retryResult = new AtomicReference<Boolean>();
+        var responseResult = new AtomicInteger(0);
+        var context = mock(TransformContext.class);
+
+        var listener = new TransformRetryableStartUpListener<>(
+            "transformId",
+            failOnceThen(immediatelyReturn()),
+            responseListener(responseResult),
+            retryListener(retryResult),
+            () -> true,
+            context
+        );
+
+        callThreeTimes("transformId", listener);
+
+        // assert only 1 retry and 1 success
+        assertEquals("Response Listener should only be called once.", 1, responseResult.get());
+        assertNotNull("Retry Listener should be called.", retryResult.get());
+        assertTrue("Retries should be scheduled.", retryResult.get());
+        verify(context, times(1)).incrementAndGetStartUpFailureCount(any(IllegalStateException.class));
+        verify(context, times(1)).resetStartUpFailureCount();
+    }
+
+    private Consumer<ActionListener<Void>> failOnceThen(Consumer<ActionListener<Void>> followup) {
+        var firstRun = new AtomicBoolean(true);
+        return l -> {
+            if (firstRun.compareAndSet(true, false)) {
+                l.onFailure(new IllegalStateException("first call fails"));
+            } else {
+                followup.accept(l);
+            }
+        };
+    }
+
+    /**
+     * When the TransformRetryableStartUpListener is never invoked
+     * Then there should be no failures to report
+     */
+    public void testUnusedRetryableIsNotReported() {
+        var context = mock(TransformContext.class);
+
+        new TransformRetryableStartUpListener<>(
+            "transformId",
+            failOnceThen(immediatelyReturn()),
+            responseListener(),
+            retryListener(),
+            () -> true,
+            context
+        );
+
+        verifyNoInteractions(context);
+    }
+
+    private ActionListener<Boolean> retryListener() {
+        return retryListener(new AtomicReference<>());
+    }
+
+    private ActionListener<Void> responseListener() {
+        return responseListener(new AtomicInteger());
+    }
+
+    /**
+     * Given one transformId
+     * When we receive an event for another transformId
+     * Then we should not take any action
+     */
+    public void testWrongTransformIdIsIgnored() {
+        var correctTransformId = "transformId";
+        var incorrectTransformId = "someOtherTransformId";
+        var retryResult = new AtomicReference<Boolean>();
+        var responseResult = new AtomicInteger(0);
+        var context = mock(TransformContext.class);
+
+        var listener = new TransformRetryableStartUpListener<>(
+            correctTransformId,
+            failOnceThen(immediatelyReturn()),
+            responseListener(responseResult),
+            retryListener(retryResult),
+            () -> true,
+            context
+        );
+
+        listener.triggered(event(incorrectTransformId));
+
+        assertEquals("Response Listener should never be called once.", 0, responseResult.get());
+        assertNull("Retry Listener should not be called.", retryResult.get());
+        verifyNoInteractions(context);
+    }
+
+    /**
+     * Given an action that always fails
+     * When shouldRetry returns true and then false
+     * Then we should call the actionListener's onFailure handler
+     */
+    public void testCancelRetries() {
+        var retryResult = new AtomicReference<Boolean>();
+        var responseResult = new AtomicInteger(0);
+        var context = mock(TransformContext.class);
+        var runTwice = new AtomicBoolean(true);
+
+        var listener = new TransformRetryableStartUpListener<>(
+            "transformId",
+            alwaysFail(),
+            responseListener(responseResult),
+            retryListener(retryResult),
+            () -> runTwice.compareAndSet(true, false),
+            context
+        );
+
+        callThreeTimes("transformId", listener);
+
+        // assert only 1 retry and 1 failure
+        assertEquals("Response Listener should only be called once.", -1, responseResult.get());
+        assertNotNull("Retry Listener should be called.", retryResult.get());
+        assertTrue("Retries should be scheduled.", retryResult.get());
+        verify(context, times(1)).incrementAndGetStartUpFailureCount(any(IllegalStateException.class));
+        verify(context, times(1)).resetStartUpFailureCount();
+    }
+
+    private Consumer<ActionListener<Void>> alwaysFail() {
+        return l -> l.onFailure(new IllegalStateException("always fail"));
+    }
+
+    /**
+     * Given an action that always fails
+     * When shouldRetry returns false
+     * Then we should call the actionListener's onFailure handler and the retryListener with "false"
+     */
+    public void testCancelRetryImmediately() {
+        var retryResult = new AtomicReference<Boolean>();
+        var responseResult = new AtomicInteger(0);
+        var context = mock(TransformContext.class);
+
+        var listener = new TransformRetryableStartUpListener<>(
+            "transformId",
+            alwaysFail(),
+            responseListener(responseResult),
+            retryListener(retryResult),
+            () -> false,
+            context
+        );
+
+        callThreeTimes("transformId", listener);
+
+        // assert no retries and 1 failure
+        assertEquals("Response Listener should only be called once.", -1, responseResult.get());
+        assertNotNull("Retry Listener should be called.", retryResult.get());
+        assertFalse("Retries should not be scheduled.", retryResult.get());
+        verify(context, only()).resetStartUpFailureCount();
+    }
+}