Prechádzať zdrojové kódy

[Transform] Check node shutdown before fail (#107358)

Transforms continue to run even when a node is shutting down. This may
lead to a transform failing and putting itself into a failed state,
which will prevent it from restarting when the node comes back online.

The transform will now abort rather than fail, which puts itself into a
started state. When the node comes back online, or another node in the
cluster starts the transform, then the transform will pick up from its
last successful saved state and checkpoint.

Close #100891
Pat Whelan 1 rok pred
rodič
commit
44c4788afb
15 zmenil súbory, kde vykonal 328 pridanie a 30 odobranie
  1. 6 0
      docs/changelog/107358.yaml
  2. 4 6
      x-pack/plugin/transform/src/main/java/org/elasticsearch/xpack/transform/Transform.java
  3. 15 1
      x-pack/plugin/transform/src/main/java/org/elasticsearch/xpack/transform/TransformClusterStateListener.java
  4. 49 0
      x-pack/plugin/transform/src/main/java/org/elasticsearch/xpack/transform/TransformNode.java
  5. 8 1
      x-pack/plugin/transform/src/main/java/org/elasticsearch/xpack/transform/TransformServices.java
  6. 2 1
      x-pack/plugin/transform/src/main/java/org/elasticsearch/xpack/transform/transforms/TransformPersistentTasksExecutor.java
  7. 22 3
      x-pack/plugin/transform/src/main/java/org/elasticsearch/xpack/transform/transforms/TransformTask.java
  8. 112 0
      x-pack/plugin/transform/src/test/java/org/elasticsearch/xpack/transform/TransformNodeTests.java
  9. 9 4
      x-pack/plugin/transform/src/test/java/org/elasticsearch/xpack/transform/transforms/ClientTransformIndexerTests.java
  10. 3 1
      x-pack/plugin/transform/src/test/java/org/elasticsearch/xpack/transform/transforms/TransformIndexerFailureHandlingTests.java
  11. 7 3
      x-pack/plugin/transform/src/test/java/org/elasticsearch/xpack/transform/transforms/TransformIndexerFailureOnStatePersistenceTests.java
  12. 5 2
      x-pack/plugin/transform/src/test/java/org/elasticsearch/xpack/transform/transforms/TransformIndexerStateTests.java
  13. 3 1
      x-pack/plugin/transform/src/test/java/org/elasticsearch/xpack/transform/transforms/TransformIndexerTests.java
  14. 1 1
      x-pack/plugin/transform/src/test/java/org/elasticsearch/xpack/transform/transforms/TransformPersistentTasksExecutorTests.java
  15. 82 6
      x-pack/plugin/transform/src/test/java/org/elasticsearch/xpack/transform/transforms/TransformTaskTests.java

+ 6 - 0
docs/changelog/107358.yaml

@@ -0,0 +1,6 @@
+pr: 107358
+summary: Check node shutdown before fail
+area: Transform
+type: enhancement
+issues:
+ - 100891

+ 4 - 6
x-pack/plugin/transform/src/main/java/org/elasticsearch/xpack/transform/Transform.java

@@ -255,14 +255,12 @@ public class Transform extends Plugin implements SystemIndexPlugin, PersistentTa
             getTransformExtension().getMinFrequency()
         );
         scheduler.start();
+        var clusterStateListener = new TransformClusterStateListener(clusterService, client);
+        var transformNode = new TransformNode(clusterStateListener);
 
-        transformServices.set(new TransformServices(configManager, checkpointService, auditor, scheduler));
+        transformServices.set(new TransformServices(configManager, checkpointService, auditor, scheduler, transformNode));
 
-        return List.of(
-            transformServices.get(),
-            new TransformClusterStateListener(clusterService, client),
-            new TransformExtensionHolder(getTransformExtension())
-        );
+        return List.of(transformServices.get(), clusterStateListener, new TransformExtensionHolder(getTransformExtension()));
     }
 
     @Override

+ 15 - 1
x-pack/plugin/transform/src/main/java/org/elasticsearch/xpack/transform/TransformClusterStateListener.java

@@ -21,17 +21,21 @@ import org.elasticsearch.cluster.service.ClusterService;
 import org.elasticsearch.gateway.GatewayService;
 import org.elasticsearch.xpack.core.transform.transforms.persistence.TransformInternalIndexConstants;
 
+import java.util.Optional;
 import java.util.concurrent.atomic.AtomicBoolean;
+import java.util.concurrent.atomic.AtomicReference;
+import java.util.function.Supplier;
 
 import static org.elasticsearch.xpack.core.ClientHelper.TRANSFORM_ORIGIN;
 import static org.elasticsearch.xpack.core.ClientHelper.executeAsyncWithOrigin;
 
-class TransformClusterStateListener implements ClusterStateListener {
+class TransformClusterStateListener implements ClusterStateListener, Supplier<Optional<ClusterState>> {
 
     private static final Logger logger = LogManager.getLogger(TransformClusterStateListener.class);
 
     private final Client client;
     private final AtomicBoolean isIndexCreationInProgress = new AtomicBoolean(false);
+    private final AtomicReference<ClusterState> clusterState = new AtomicReference<>();
 
     TransformClusterStateListener(ClusterService clusterService, Client client) {
         this.client = client;
@@ -46,6 +50,8 @@ class TransformClusterStateListener implements ClusterStateListener {
             return;
         }
 
+        clusterState.set(event.state());
+
         // The atomic flag prevents multiple simultaneous attempts to run alias creation
         // if there is a flurry of cluster state updates in quick succession
         if (event.localNodeMaster() && isIndexCreationInProgress.compareAndSet(false, true)) {
@@ -102,4 +108,12 @@ class TransformClusterStateListener implements ClusterStateListener {
         );
     }
 
+    /**
+     * Retrieves the saved cluster state from the most recent update.
+     * This differs from {@link ClusterService#state()} in that it will not throw an exception when ClusterState is null.
+     */
+    @Override
+    public Optional<ClusterState> get() {
+        return Optional.ofNullable(clusterState.get());
+    }
 }

+ 49 - 0
x-pack/plugin/transform/src/main/java/org/elasticsearch/xpack/transform/TransformNode.java

@@ -0,0 +1,49 @@
+/*
+ * Copyright Elasticsearch B.V. and/or licensed to Elasticsearch B.V. under one
+ * or more contributor license agreements. Licensed under the Elastic License
+ * 2.0; you may not use this file except in compliance with the Elastic License
+ * 2.0.
+ */
+
+package org.elasticsearch.xpack.transform;
+
+import org.elasticsearch.cluster.ClusterState;
+import org.elasticsearch.cluster.node.DiscoveryNodes;
+
+import java.util.Optional;
+import java.util.function.Supplier;
+
+/**
+ * Stateful representation of this node, relevant to the {@link org.elasticsearch.xpack.transform.transforms.TransformTask}.
+ * For stateless functions, see {@link org.elasticsearch.xpack.transform.transforms.TransformNodes}.
+ */
+public class TransformNode {
+    private final Supplier<Optional<ClusterState>> clusterState;
+
+    public TransformNode(Supplier<Optional<ClusterState>> clusterState) {
+        this.clusterState = clusterState;
+    }
+
+    /**
+     * @return an optional containing true if this node is reported as shutting down in the cluster state metadata, false if it is not
+     * reported as shutting down, or empty if the cluster state is missing or the local node has not been set yet.
+     */
+    public Optional<Boolean> isShuttingDown() {
+        return clusterState.get().map(state -> {
+            var localId = state.nodes().getLocalNodeId();
+            if (localId != null) {
+                return state.metadata().nodeShutdowns().contains(localId);
+            } else {
+                return null; // empty
+            }
+        });
+    }
+
+    /**
+     * @return the node id stored in the cluster state, or "null" if the cluster state is missing or the local node has not been set yet.
+     * This should behave exactly as {@link String#valueOf(Object)}.
+     */
+    public String nodeId() {
+        return clusterState.get().map(ClusterState::nodes).map(DiscoveryNodes::getLocalNodeId).orElse("null");
+    }
+}

+ 8 - 1
x-pack/plugin/transform/src/main/java/org/elasticsearch/xpack/transform/TransformServices.java

@@ -26,17 +26,20 @@ public final class TransformServices {
     private final TransformCheckpointService checkpointService;
     private final TransformAuditor auditor;
     private final TransformScheduler scheduler;
+    private final TransformNode transformNode;
 
     public TransformServices(
         TransformConfigManager configManager,
         TransformCheckpointService checkpointService,
         TransformAuditor auditor,
-        TransformScheduler scheduler
+        TransformScheduler scheduler,
+        TransformNode transformNode
     ) {
         this.configManager = Objects.requireNonNull(configManager);
         this.checkpointService = Objects.requireNonNull(checkpointService);
         this.auditor = Objects.requireNonNull(auditor);
         this.scheduler = Objects.requireNonNull(scheduler);
+        this.transformNode = transformNode;
     }
 
     public TransformConfigManager getConfigManager() {
@@ -54,4 +57,8 @@ public final class TransformServices {
     public TransformScheduler getScheduler() {
         return scheduler;
     }
+
+    public TransformNode getTransformNode() {
+        return transformNode;
+    }
 }

+ 2 - 1
x-pack/plugin/transform/src/main/java/org/elasticsearch/xpack/transform/transforms/TransformPersistentTasksExecutor.java

@@ -497,7 +497,8 @@ public class TransformPersistentTasksExecutor extends PersistentTasksExecutor<Tr
             transformServices.getScheduler(),
             auditor,
             threadPool,
-            headers
+            headers,
+            transformServices.getTransformNode()
         );
     }
 }

+ 22 - 3
x-pack/plugin/transform/src/main/java/org/elasticsearch/xpack/transform/transforms/TransformTask.java

@@ -40,6 +40,7 @@ import org.elasticsearch.xpack.core.transform.transforms.TransformIndexerStats;
 import org.elasticsearch.xpack.core.transform.transforms.TransformState;
 import org.elasticsearch.xpack.core.transform.transforms.TransformTaskParams;
 import org.elasticsearch.xpack.core.transform.transforms.TransformTaskState;
+import org.elasticsearch.xpack.transform.TransformNode;
 import org.elasticsearch.xpack.transform.checkpoint.TransformCheckpointService;
 import org.elasticsearch.xpack.transform.notifications.TransformAuditor;
 import org.elasticsearch.xpack.transform.transforms.scheduling.TransformScheduler;
@@ -68,6 +69,7 @@ public class TransformTask extends AllocatedPersistentTask implements TransformS
     private final TransformIndexerPosition initialPosition;
     private final IndexerState initialIndexerState;
     private final TransformContext context;
+    private final TransformNode transformNode;
     private final SetOnce<ClientTransformIndexer> indexer = new SetOnce<>();
 
     @SuppressWarnings("this-escape")
@@ -81,7 +83,8 @@ public class TransformTask extends AllocatedPersistentTask implements TransformS
         TransformScheduler transformScheduler,
         TransformAuditor auditor,
         ThreadPool threadPool,
-        Map<String, String> headers
+        Map<String, String> headers,
+        TransformNode transformNode
     ) {
         super(id, type, action, TransformField.PERSISTENT_TASK_DESCRIPTION_PREFIX + transform.getId(), parentTask, headers);
         this.transform = transform;
@@ -118,6 +121,7 @@ public class TransformTask extends AllocatedPersistentTask implements TransformS
         if (state != null) {
             this.context.setAuthState(state.getAuthState());
         }
+        this.transformNode = transformNode;
     }
 
     public String getTransformId() {
@@ -524,11 +528,26 @@ public class TransformTask extends AllocatedPersistentTask implements TransformS
                 return;
             }
 
-            logger.atError().withThrowable(exception).log("[{}] transform has failed; experienced: [{}].", transform.getId(), reason);
-            auditor.error(transform.getId(), reason);
             // We should not keep retrying. Either the task will be stopped, or started
             // If it is started again, it is registered again.
             transformScheduler.deregisterTransform(getTransformId());
+
+            if (transformNode.isShuttingDown().orElse(false)) {
+                logger.atDebug()
+                    .withThrowable(exception)
+                    .log(
+                        "Aborting transform [{}]. Transform has failed while node [{}] is shutting down. Reason: [{}]",
+                        transform.getId(),
+                        transformNode.nodeId(),
+                        reason
+                    );
+                markAsLocallyAborted("Node is shutting down.");
+                listener.onResponse(null);
+                return;
+            }
+
+            logger.atError().withThrowable(exception).log("[{}] transform has failed; experienced: [{}].", transform.getId(), reason);
+            auditor.error(transform.getId(), reason);
             // The idea of stopping at the next checkpoint is no longer valid. Since a failed task could potentially START again,
             // we should set this flag to false.
             context.setShouldStopAtCheckpoint(false);

+ 112 - 0
x-pack/plugin/transform/src/test/java/org/elasticsearch/xpack/transform/TransformNodeTests.java

@@ -0,0 +1,112 @@
+/*
+ * Copyright Elasticsearch B.V. and/or licensed to Elasticsearch B.V. under one
+ * or more contributor license agreements. Licensed under the Elastic License
+ * 2.0; you may not use this file except in compliance with the Elastic License
+ * 2.0.
+ */
+
+package org.elasticsearch.xpack.transform;
+
+import org.elasticsearch.cluster.ClusterName;
+import org.elasticsearch.cluster.ClusterState;
+import org.elasticsearch.cluster.metadata.Metadata;
+import org.elasticsearch.cluster.metadata.NodesShutdownMetadata;
+import org.elasticsearch.cluster.metadata.SingleNodeShutdownMetadata;
+import org.elasticsearch.cluster.node.DiscoveryNodeUtils;
+import org.elasticsearch.cluster.node.DiscoveryNodes;
+import org.elasticsearch.test.ESTestCase;
+
+import java.util.Map;
+import java.util.Optional;
+import java.util.function.Supplier;
+
+import static org.hamcrest.Matchers.equalTo;
+
+public class TransformNodeTests extends ESTestCase {
+    private static final String SHUTTING_DOWN_ID = "shuttingDownNodeId";
+    private static final String NOT_SHUTTING_DOWN_ID = "notShuttingDownId";
+
+    /**
+     * When the local node is shutting down
+     * Then we return true
+     */
+    public void testIsShuttingDown() {
+        var isShuttingDown = new TransformNode(clusterState(SHUTTING_DOWN_ID)).isShuttingDown();
+        assertTrue(isShuttingDown.isPresent());
+        assertTrue(isShuttingDown.get());
+    }
+
+    /**
+     * When the local node is not shutting down
+     * Then we return false
+     */
+    public void testIsNotShuttingDown() {
+        var isShuttingDown = new TransformNode(clusterState(NOT_SHUTTING_DOWN_ID)).isShuttingDown();
+        assertTrue(isShuttingDown.isPresent());
+        assertFalse(isShuttingDown.get());
+    }
+
+    /**
+     * When the local node is null
+     * Then we return empty
+     */
+    public void testMissingLocalId() {
+        var isShuttingDown = new TransformNode(clusterState(null)).isShuttingDown();
+        assertFalse(isShuttingDown.isPresent());
+    }
+
+    /**
+     * When the cluster state is empty
+     * Then we return empty
+     */
+    public void testClusterStateMissing() {
+        var isShuttingDown = new TransformNode(Optional::empty).isShuttingDown();
+        assertFalse(isShuttingDown.isPresent());
+    }
+
+    /**
+     * When there is a local node
+     * Then return its id
+     */
+    public void testNodeId() {
+        var nodeId = new TransformNode(clusterState(SHUTTING_DOWN_ID)).nodeId();
+        assertThat(nodeId, equalTo(SHUTTING_DOWN_ID));
+    }
+
+    /**
+     * When the local node is null
+     * Then return "null"
+     */
+    public void testNodeIdMissing() {
+        var nodeId = new TransformNode(Optional::empty).nodeId();
+        assertThat(nodeId, equalTo(String.valueOf((String) null)));
+    }
+
+    private Supplier<Optional<ClusterState>> clusterState(String nodeId) {
+        var nodesShutdownMetadata = new NodesShutdownMetadata(
+            Map.of(
+                SHUTTING_DOWN_ID,
+                SingleNodeShutdownMetadata.builder()
+                    .setNodeId(SHUTTING_DOWN_ID)
+                    .setReason("shutdown for a unit test")
+                    .setType(SingleNodeShutdownMetadata.Type.RESTART)
+                    .setStartedAtMillis(randomNonNegativeLong())
+                    .setGracePeriod(null)
+                    .build()
+            )
+        );
+
+        var nodes = DiscoveryNodes.builder().add(DiscoveryNodeUtils.create(SHUTTING_DOWN_ID)).localNodeId(nodeId).masterNodeId(nodeId);
+
+        if (SHUTTING_DOWN_ID.equals(nodeId) == false && nodeId != null) {
+            nodes.add(DiscoveryNodeUtils.create(nodeId));
+        }
+
+        var state = ClusterState.builder(ClusterName.DEFAULT)
+            .metadata(Metadata.builder().putCustom(NodesShutdownMetadata.TYPE, nodesShutdownMetadata).build())
+            .nodes(nodes)
+            .build();
+
+        return () -> Optional.of(state);
+    }
+}

+ 9 - 4
x-pack/plugin/transform/src/test/java/org/elasticsearch/xpack/transform/transforms/ClientTransformIndexerTests.java

@@ -53,6 +53,7 @@ import org.elasticsearch.xpack.core.transform.transforms.TransformIndexerStats;
 import org.elasticsearch.xpack.core.transform.transforms.TransformProgress;
 import org.elasticsearch.xpack.core.transform.transforms.persistence.TransformInternalIndexConstants;
 import org.elasticsearch.xpack.transform.TransformExtension;
+import org.elasticsearch.xpack.transform.TransformNode;
 import org.elasticsearch.xpack.transform.TransformServices;
 import org.elasticsearch.xpack.transform.checkpoint.CheckpointProvider;
 import org.elasticsearch.xpack.transform.checkpoint.TransformCheckpointService;
@@ -140,7 +141,8 @@ public class ClientTransformIndexerTests extends ESTestCase {
                     mock(IndexBasedTransformConfigManager.class),
                     mock(TransformCheckpointService.class),
                     mock(TransformAuditor.class),
-                    new TransformScheduler(Clock.systemUTC(), mock(ThreadPool.class), Settings.EMPTY, TimeValue.ZERO)
+                    new TransformScheduler(Clock.systemUTC(), mock(ThreadPool.class), Settings.EMPTY, TimeValue.ZERO),
+                    mock(TransformNode.class)
                 ),
                 mock(CheckpointProvider.class),
                 new AtomicReference<>(IndexerState.STOPPED),
@@ -237,7 +239,8 @@ public class ClientTransformIndexerTests extends ESTestCase {
                     mock(IndexBasedTransformConfigManager.class),
                     mock(TransformCheckpointService.class),
                     mock(TransformAuditor.class),
-                    new TransformScheduler(Clock.systemUTC(), mock(ThreadPool.class), Settings.EMPTY, TimeValue.ZERO)
+                    new TransformScheduler(Clock.systemUTC(), mock(ThreadPool.class), Settings.EMPTY, TimeValue.ZERO),
+                    mock(TransformNode.class)
                 ),
                 mock(CheckpointProvider.class),
                 new AtomicReference<>(IndexerState.STOPPED),
@@ -323,7 +326,8 @@ public class ClientTransformIndexerTests extends ESTestCase {
                     mock(IndexBasedTransformConfigManager.class),
                     mock(TransformCheckpointService.class),
                     mock(TransformAuditor.class),
-                    new TransformScheduler(Clock.systemUTC(), mock(ThreadPool.class), Settings.EMPTY, TimeValue.ZERO)
+                    new TransformScheduler(Clock.systemUTC(), mock(ThreadPool.class), Settings.EMPTY, TimeValue.ZERO),
+                    mock(TransformNode.class)
                 ),
                 mock(CheckpointProvider.class),
                 new AtomicReference<>(IndexerState.STOPPED),
@@ -572,7 +576,8 @@ public class ClientTransformIndexerTests extends ESTestCase {
                 mock(IndexBasedTransformConfigManager.class),
                 mock(TransformCheckpointService.class),
                 mock(TransformAuditor.class),
-                new TransformScheduler(Clock.systemUTC(), mock(ThreadPool.class), Settings.EMPTY, TimeValue.ZERO)
+                new TransformScheduler(Clock.systemUTC(), mock(ThreadPool.class), Settings.EMPTY, TimeValue.ZERO),
+                mock(TransformNode.class)
             ),
             mock(CheckpointProvider.class),
             new AtomicReference<>(IndexerState.STOPPED),

+ 3 - 1
x-pack/plugin/transform/src/test/java/org/elasticsearch/xpack/transform/transforms/TransformIndexerFailureHandlingTests.java

@@ -51,6 +51,7 @@ import org.elasticsearch.xpack.core.transform.transforms.TransformState;
 import org.elasticsearch.xpack.core.transform.transforms.TransformTaskState;
 import org.elasticsearch.xpack.transform.Transform;
 import org.elasticsearch.xpack.transform.TransformExtension;
+import org.elasticsearch.xpack.transform.TransformNode;
 import org.elasticsearch.xpack.transform.TransformServices;
 import org.elasticsearch.xpack.transform.checkpoint.CheckpointProvider;
 import org.elasticsearch.xpack.transform.checkpoint.TransformCheckpointService;
@@ -137,7 +138,8 @@ public class TransformIndexerFailureHandlingTests extends ESTestCase {
                     transformsConfigManager,
                     mock(TransformCheckpointService.class),
                     auditor,
-                    new TransformScheduler(Clock.systemUTC(), threadPool, Settings.EMPTY, TimeValue.ZERO)
+                    new TransformScheduler(Clock.systemUTC(), threadPool, Settings.EMPTY, TimeValue.ZERO),
+                    mock(TransformNode.class)
                 ),
                 checkpointProvider,
                 initialState,

+ 7 - 3
x-pack/plugin/transform/src/test/java/org/elasticsearch/xpack/transform/transforms/TransformIndexerFailureOnStatePersistenceTests.java

@@ -35,6 +35,7 @@ import org.elasticsearch.xpack.core.transform.transforms.TransformStoredDoc;
 import org.elasticsearch.xpack.core.transform.transforms.TransformTaskState;
 import org.elasticsearch.xpack.core.transform.transforms.persistence.TransformInternalIndexConstants;
 import org.elasticsearch.xpack.transform.TransformExtension;
+import org.elasticsearch.xpack.transform.TransformNode;
 import org.elasticsearch.xpack.transform.TransformServices;
 import org.elasticsearch.xpack.transform.checkpoint.CheckpointProvider;
 import org.elasticsearch.xpack.transform.checkpoint.TransformCheckpointService;
@@ -230,7 +231,8 @@ public class TransformIndexerFailureOnStatePersistenceTests extends ESTestCase {
                         configManager,
                         mock(TransformCheckpointService.class),
                         mock(TransformAuditor.class),
-                        new TransformScheduler(Clock.systemUTC(), mock(ThreadPool.class), Settings.EMPTY, TimeValue.ZERO)
+                        new TransformScheduler(Clock.systemUTC(), mock(ThreadPool.class), Settings.EMPTY, TimeValue.ZERO),
+                        mock(TransformNode.class)
                     ),
                     mock(CheckpointProvider.class),
                     new AtomicReference<>(IndexerState.STOPPED),
@@ -315,7 +317,8 @@ public class TransformIndexerFailureOnStatePersistenceTests extends ESTestCase {
                         configManager,
                         mock(TransformCheckpointService.class),
                         mock(TransformAuditor.class),
-                        new TransformScheduler(Clock.systemUTC(), mock(ThreadPool.class), Settings.EMPTY, TimeValue.ZERO)
+                        new TransformScheduler(Clock.systemUTC(), mock(ThreadPool.class), Settings.EMPTY, TimeValue.ZERO),
+                        mock(TransformNode.class)
                     ),
                     mock(CheckpointProvider.class),
                     new AtomicReference<>(IndexerState.STOPPED),
@@ -449,7 +452,8 @@ public class TransformIndexerFailureOnStatePersistenceTests extends ESTestCase {
                     configManager,
                     mock(TransformCheckpointService.class),
                     mock(TransformAuditor.class),
-                    new TransformScheduler(Clock.systemUTC(), mock(ThreadPool.class), Settings.EMPTY, TimeValue.ZERO)
+                    new TransformScheduler(Clock.systemUTC(), mock(ThreadPool.class), Settings.EMPTY, TimeValue.ZERO),
+                    mock(TransformNode.class)
                 ),
                 mock(CheckpointProvider.class),
                 new AtomicReference<>(IndexerState.STOPPED),

+ 5 - 2
x-pack/plugin/transform/src/test/java/org/elasticsearch/xpack/transform/transforms/TransformIndexerStateTests.java

@@ -44,6 +44,7 @@ import org.elasticsearch.xpack.core.transform.transforms.TransformIndexerPositio
 import org.elasticsearch.xpack.core.transform.transforms.TransformIndexerStats;
 import org.elasticsearch.xpack.core.transform.transforms.TransformState;
 import org.elasticsearch.xpack.core.transform.transforms.TransformTaskState;
+import org.elasticsearch.xpack.transform.TransformNode;
 import org.elasticsearch.xpack.transform.TransformServices;
 import org.elasticsearch.xpack.transform.checkpoint.CheckpointProvider;
 import org.elasticsearch.xpack.transform.checkpoint.MockTimebasedCheckpointProvider;
@@ -814,7 +815,8 @@ public class TransformIndexerStateTests extends ESTestCase {
             transformConfigManager,
             mock(TransformCheckpointService.class),
             transformAuditor,
-            new TransformScheduler(Clock.systemUTC(), threadPool, Settings.EMPTY, TimeValue.ZERO)
+            new TransformScheduler(Clock.systemUTC(), threadPool, Settings.EMPTY, TimeValue.ZERO),
+            mock(TransformNode.class)
         );
 
         MockedTransformIndexer indexer = new MockedTransformIndexer(
@@ -848,7 +850,8 @@ public class TransformIndexerStateTests extends ESTestCase {
             transformConfigManager,
             mock(TransformCheckpointService.class),
             transformAuditor,
-            new TransformScheduler(Clock.systemUTC(), threadPool, Settings.EMPTY, TimeValue.ZERO)
+            new TransformScheduler(Clock.systemUTC(), threadPool, Settings.EMPTY, TimeValue.ZERO),
+            mock(TransformNode.class)
         );
 
         MockedTransformIndexerForStatePersistenceTesting indexer = new MockedTransformIndexerForStatePersistenceTesting(

+ 3 - 1
x-pack/plugin/transform/src/test/java/org/elasticsearch/xpack/transform/transforms/TransformIndexerTests.java

@@ -43,6 +43,7 @@ import org.elasticsearch.xpack.core.transform.transforms.TransformIndexerPositio
 import org.elasticsearch.xpack.core.transform.transforms.TransformIndexerStats;
 import org.elasticsearch.xpack.core.transform.transforms.TransformState;
 import org.elasticsearch.xpack.core.transform.transforms.TransformTaskState;
+import org.elasticsearch.xpack.transform.TransformNode;
 import org.elasticsearch.xpack.transform.TransformServices;
 import org.elasticsearch.xpack.transform.checkpoint.CheckpointProvider;
 import org.elasticsearch.xpack.transform.checkpoint.MockTimebasedCheckpointProvider;
@@ -454,7 +455,8 @@ public class TransformIndexerTests extends ESTestCase {
             transformConfigManager,
             mock(TransformCheckpointService.class),
             transformAuditor,
-            new TransformScheduler(Clock.systemUTC(), threadPool, Settings.EMPTY, TimeValue.ZERO)
+            new TransformScheduler(Clock.systemUTC(), threadPool, Settings.EMPTY, TimeValue.ZERO),
+            mock(TransformNode.class)
         );
 
         MockedTransformIndexer indexer = new MockedTransformIndexer(

+ 1 - 1
x-pack/plugin/transform/src/test/java/org/elasticsearch/xpack/transform/transforms/TransformPersistentTasksExecutorTests.java

@@ -564,7 +564,7 @@ public class TransformPersistentTasksExecutorTests extends ESTestCase {
             configManager,
             mockAuditor
         );
-        return new TransformServices(configManager, transformCheckpointService, mockAuditor, scheduler);
+        return new TransformServices(configManager, transformCheckpointService, mockAuditor, scheduler, null);
     }
 
     private TransformPersistentTasksExecutor buildTaskExecutor(TransformServices transformServices) {

+ 82 - 6
x-pack/plugin/transform/src/test/java/org/elasticsearch/xpack/transform/transforms/TransformTaskTests.java

@@ -45,6 +45,7 @@ import org.elasticsearch.xpack.core.transform.transforms.TransformState;
 import org.elasticsearch.xpack.core.transform.transforms.TransformTaskParams;
 import org.elasticsearch.xpack.core.transform.transforms.TransformTaskState;
 import org.elasticsearch.xpack.transform.DefaultTransformExtension;
+import org.elasticsearch.xpack.transform.TransformNode;
 import org.elasticsearch.xpack.transform.TransformServices;
 import org.elasticsearch.xpack.transform.checkpoint.TransformCheckpointService;
 import org.elasticsearch.xpack.transform.notifications.MockTransformAuditor;
@@ -57,6 +58,7 @@ import org.mockito.verification.VerificationMode;
 
 import java.time.Clock;
 import java.util.Collections;
+import java.util.Optional;
 import java.util.Set;
 import java.util.concurrent.ExecutorService;
 import java.util.concurrent.atomic.AtomicBoolean;
@@ -73,6 +75,8 @@ import static org.hamcrest.Matchers.nullValue;
 import static org.hamcrest.Matchers.sameInstance;
 import static org.mockito.ArgumentMatchers.any;
 import static org.mockito.ArgumentMatchers.anyLong;
+import static org.mockito.ArgumentMatchers.eq;
+import static org.mockito.ArgumentMatchers.isNull;
 import static org.mockito.Mockito.mock;
 import static org.mockito.Mockito.never;
 import static org.mockito.Mockito.times;
@@ -130,7 +134,8 @@ public class TransformTaskTests extends ESTestCase {
             new TransformScheduler(clock, threadPool, Settings.EMPTY, TimeValue.ZERO),
             auditor,
             threadPool,
-            Collections.emptyMap()
+            Collections.emptyMap(),
+            mockTransformNode()
         );
 
         TaskManager taskManager = mock(TaskManager.class);
@@ -174,6 +179,12 @@ public class TransformTaskTests extends ESTestCase {
         assertEquals(state.getReason(), null);
     }
 
+    private TransformNode mockTransformNode() {
+        var transformNode = mock(TransformNode.class);
+        when(transformNode.isShuttingDown()).thenReturn(randomBoolean() ? Optional.of(false) : Optional.<Boolean>empty());
+        return transformNode;
+    }
+
     private TransformServices transformServices(Clock clock, TransformAuditor auditor, ThreadPool threadPool) {
         var transformsConfigManager = new InMemoryTransformConfigManager();
         var transformsCheckpointService = new TransformCheckpointService(
@@ -192,7 +203,8 @@ public class TransformTaskTests extends ESTestCase {
             transformsConfigManager,
             transformsCheckpointService,
             auditor,
-            new TransformScheduler(clock, threadPool, Settings.EMPTY, TimeValue.ZERO)
+            new TransformScheduler(clock, threadPool, Settings.EMPTY, TimeValue.ZERO),
+            mock(TransformNode.class)
         );
     }
 
@@ -234,7 +246,8 @@ public class TransformTaskTests extends ESTestCase {
             new TransformScheduler(Clock.systemUTC(), threadPool, Settings.EMPTY, TimeValue.ZERO),
             auditor,
             threadPool,
-            Collections.emptyMap()
+            Collections.emptyMap(),
+            mockTransformNode()
         );
 
         TaskManager taskManager = mock(TaskManager.class);
@@ -279,6 +292,66 @@ public class TransformTaskTests extends ESTestCase {
         assertEquals(state.getReason(), null);
     }
 
+    public void testFailWhenNodeIsShuttingDown() {
+        var threadPool = mock(ThreadPool.class);
+        when(threadPool.executor("generic")).thenReturn(mock(ExecutorService.class));
+
+        var transformConfig = TransformConfigTests.randomTransformConfigWithoutHeaders();
+        var auditor = MockTransformAuditor.createMockAuditor();
+
+        var transformState = new TransformState(
+            TransformTaskState.STARTED,
+            IndexerState.INDEXING,
+            null,
+            0L,
+            "because",
+            null,
+            null,
+            false,
+            null
+        );
+
+        var node = mock(TransformNode.class);
+        when(node.isShuttingDown()).thenReturn(Optional.of(true));
+        when(node.nodeId()).thenReturn("node");
+
+        var transformTask = new TransformTask(
+            42,
+            "some_type",
+            "some_action",
+            TaskId.EMPTY_TASK_ID,
+            createTransformTaskParams(transformConfig.getId()),
+            transformState,
+            new TransformScheduler(Clock.systemUTC(), threadPool, Settings.EMPTY, TimeValue.ZERO),
+            auditor,
+            threadPool,
+            Collections.emptyMap(),
+            node
+        );
+
+        var taskManager = mock(TaskManager.class);
+        var persistentTasksService = mock(PersistentTasksService.class);
+        transformTask.init(persistentTasksService, taskManager, "task-id", 42);
+        var listenerCalled = new AtomicBoolean(false);
+        transformTask.fail(null, "because", ActionTestUtils.assertNoFailureListener(r -> { listenerCalled.compareAndSet(false, true); }));
+
+        var state = transformTask.getState();
+        assertEquals(TransformTaskState.STARTED, state.getTaskState());
+        assertEquals(IndexerState.STARTED, state.getIndexerState());
+
+        assertTrue(listenerCalled.get());
+        // verify shutdown has been called
+        verify(taskManager, times(1)).unregister(any());
+        verify(persistentTasksService, times(1)).sendCompletionRequest(
+            eq("task-id"),
+            eq(42L),
+            isNull(),
+            eq("Node is shutting down."),
+            isNull(),
+            any()
+        );
+    }
+
     public void testGetTransformTask() {
         {
             ClusterState clusterState = ClusterState.EMPTY_STATE;
@@ -453,7 +526,8 @@ public class TransformTaskTests extends ESTestCase {
             new TransformScheduler(Clock.systemUTC(), threadPool, Settings.EMPTY, TimeValue.ZERO),
             auditor,
             threadPool,
-            Collections.emptyMap()
+            Collections.emptyMap(),
+            mockTransformNode()
         );
         assertThat(transformTask.getContext().getAuthState().getStatus(), is(equalTo(HealthStatus.GREEN)));
 
@@ -502,7 +576,8 @@ public class TransformTaskTests extends ESTestCase {
             new TransformScheduler(Clock.systemUTC(), threadPool, Settings.EMPTY, TimeValue.ZERO),
             auditor,
             threadPool,
-            Collections.emptyMap()
+            Collections.emptyMap(),
+            mockTransformNode()
         );
     }
 
@@ -603,7 +678,8 @@ public class TransformTaskTests extends ESTestCase {
             new TransformScheduler(mock(Clock.class), threadPool, Settings.EMPTY, TimeValue.ZERO),
             auditor,
             threadPool,
-            Collections.emptyMap()
+            Collections.emptyMap(),
+            mockTransformNode()
         );
 
         ClientTransformIndexer indexer = mock(ClientTransformIndexer.class);