Browse Source

[ML] Do not mark the DFA job as FAILED when a failure occurs after the node is shutdown (#61331)

Przemysław Witek 5 years ago
parent
commit
106ebd6b92

+ 3 - 2
x-pack/plugin/ml/src/main/java/org/elasticsearch/xpack/ml/MachineLearning.java

@@ -729,8 +729,9 @@ public class MachineLearning extends Plugin implements SystemIndexPlugin,
         MlMemoryTracker memoryTracker = new MlMemoryTracker(settings, clusterService, threadPool, jobManager, jobResultsProvider,
             dataFrameAnalyticsConfigProvider);
         this.memoryTracker.set(memoryTracker);
-        MlLifeCycleService mlLifeCycleService = new MlLifeCycleService(clusterService, datafeedManager, mlController,
-            autodetectProcessManager, memoryTracker);
+        MlLifeCycleService mlLifeCycleService =
+            new MlLifeCycleService(
+                clusterService, datafeedManager, mlController, autodetectProcessManager, dataFrameAnalyticsManager, memoryTracker);
         MlAssignmentNotifier mlAssignmentNotifier = new MlAssignmentNotifier(anomalyDetectionAuditor, dataFrameAnalyticsAuditor, threadPool,
             new MlConfigMigrator(settings, client, clusterService, indexNameExpressionResolver), clusterService);
 

+ 10 - 5
x-pack/plugin/ml/src/main/java/org/elasticsearch/xpack/ml/MlLifeCycleService.java

@@ -8,9 +8,10 @@ package org.elasticsearch.xpack.ml;
 import org.elasticsearch.cluster.service.ClusterService;
 import org.elasticsearch.common.component.LifecycleListener;
 import org.elasticsearch.xpack.ml.datafeed.DatafeedManager;
+import org.elasticsearch.xpack.ml.dataframe.DataFrameAnalyticsManager;
+import org.elasticsearch.xpack.ml.job.process.autodetect.AutodetectProcessManager;
 import org.elasticsearch.xpack.ml.process.MlController;
 import org.elasticsearch.xpack.ml.process.MlMemoryTracker;
-import org.elasticsearch.xpack.ml.job.process.autodetect.AutodetectProcessManager;
 
 import java.io.IOException;
 import java.util.Objects;
@@ -20,13 +21,16 @@ public class MlLifeCycleService {
     private final DatafeedManager datafeedManager;
     private final MlController mlController;
     private final AutodetectProcessManager autodetectProcessManager;
+    private final DataFrameAnalyticsManager analyticsManager;
     private final MlMemoryTracker memoryTracker;
 
     MlLifeCycleService(ClusterService clusterService, DatafeedManager datafeedManager, MlController mlController,
-                       AutodetectProcessManager autodetectProcessManager, MlMemoryTracker memoryTracker) {
+                       AutodetectProcessManager autodetectProcessManager, DataFrameAnalyticsManager analyticsManager,
+                       MlMemoryTracker memoryTracker) {
         this.datafeedManager = Objects.requireNonNull(datafeedManager);
         this.mlController = Objects.requireNonNull(mlController);
         this.autodetectProcessManager = Objects.requireNonNull(autodetectProcessManager);
+        this.analyticsManager = Objects.requireNonNull(analyticsManager);
         this.memoryTracker = Objects.requireNonNull(memoryTracker);
         clusterService.addLifecycleListener(new LifecycleListener() {
             @Override
@@ -38,9 +42,10 @@ public class MlLifeCycleService {
 
     public synchronized void stop() {
         try {
-            // This prevents datafeeds from sending data to autodetect processes WITHOUT stopping the
-            // datafeeds, so they get reassigned.  We have to do this first, otherwise the datafeeds
-            // could fail if they send data to a dead autodetect process.
+            // This prevents data frame analytics from being marked as failed due to exceptions occurring while the node is shutting down.
+            analyticsManager.markNodeAsShuttingDown();
+            // This prevents datafeeds from sending data to autodetect processes WITHOUT stopping the datafeeds, so they get reassigned.
+            // We have to do this first, otherwise the datafeeds could fail if they send data to a dead autodetect process.
             datafeedManager.isolateAllDatafeedsOnThisNodeBeforeShutdown();
             // This kills autodetect processes WITHOUT closing the jobs, so they get reassigned.
             autodetectProcessManager.killAllProcessesOnThisNode();

+ 11 - 0
x-pack/plugin/ml/src/main/java/org/elasticsearch/xpack/ml/dataframe/DataFrameAnalyticsManager.java

@@ -50,6 +50,7 @@ import org.elasticsearch.xpack.ml.notifications.DataFrameAnalyticsAuditor;
 
 import java.time.Clock;
 import java.util.Objects;
+import java.util.concurrent.atomic.AtomicBoolean;
 import java.util.function.Supplier;
 
 import static org.elasticsearch.xpack.core.ClientHelper.ML_ORIGIN;
@@ -67,6 +68,8 @@ public class DataFrameAnalyticsManager {
     private final AnalyticsProcessManager processManager;
     private final DataFrameAnalyticsAuditor auditor;
     private final IndexNameExpressionResolver expressionResolver;
+    /** Indicates whether the node is shutting down. */
+    private final AtomicBoolean nodeShuttingDown = new AtomicBoolean();
 
     public DataFrameAnalyticsManager(NodeClient client, DataFrameAnalyticsConfigProvider configProvider,
                                      AnalyticsProcessManager processManager, DataFrameAnalyticsAuditor auditor,
@@ -392,4 +395,12 @@ public class DataFrameAnalyticsManager {
     public void stop(DataFrameAnalyticsTask task) {
         processManager.stop(task);
     }
+
+    public boolean isNodeShuttingDown() {
+        return nodeShuttingDown.get();
+    }
+
+    public void markNodeAsShuttingDown() {
+        nodeShuttingDown.set(true);
+    }
 }

+ 18 - 3
x-pack/plugin/ml/src/main/java/org/elasticsearch/xpack/ml/dataframe/DataFrameAnalyticsTask.java

@@ -32,8 +32,10 @@ import org.elasticsearch.common.xcontent.json.JsonXContent;
 import org.elasticsearch.index.query.IdsQueryBuilder;
 import org.elasticsearch.index.reindex.BulkByScrollTask;
 import org.elasticsearch.persistent.AllocatedPersistentTask;
+import org.elasticsearch.persistent.PersistentTasksService;
 import org.elasticsearch.search.builder.SearchSourceBuilder;
 import org.elasticsearch.tasks.TaskId;
+import org.elasticsearch.tasks.TaskManager;
 import org.elasticsearch.tasks.TaskResult;
 import org.elasticsearch.xpack.core.ml.MlTasks;
 import org.elasticsearch.xpack.core.ml.action.GetDataFrameAnalyticsStatsAction;
@@ -106,6 +108,14 @@ public class DataFrameAnalyticsTask extends AllocatedPersistentTask implements S
         return statsHolder;
     }
 
+    @Override
+    protected void init(PersistentTasksService persistentTasksService,
+                        TaskManager taskManager,
+                        String persistentTaskId,
+                        long allocationId) {
+        super.init(persistentTasksService, taskManager, persistentTaskId, allocationId);
+    }
+
     @Override
     protected void onCancelled() {
         stop(getReasonCancelled(), StopDataFrameAnalyticsAction.DEFAULT_TIMEOUT);
@@ -200,10 +210,16 @@ public class DataFrameAnalyticsTask extends AllocatedPersistentTask implements S
     }
 
     public void setFailed(Exception error) {
+        if (analyticsManager.isNodeShuttingDown()) {
+            LOGGER.warn(
+                new ParameterizedMessage("[{}] *Not* setting task to failed because the node is being shutdown", taskParams.getId()),
+                error);
+            return;
+        }
         LOGGER.error(new ParameterizedMessage("[{}] Setting task to failed", taskParams.getId()), error);
         String reason = ExceptionsHelper.unwrapCause(error).getMessage();
-        DataFrameAnalyticsTaskState newTaskState = new DataFrameAnalyticsTaskState(DataFrameAnalyticsState.FAILED,
-                getAllocationId(), reason);
+        DataFrameAnalyticsTaskState newTaskState =
+            new DataFrameAnalyticsTaskState(DataFrameAnalyticsState.FAILED, getAllocationId(), reason);
         updatePersistentTaskState(
             newTaskState,
             ActionListener.wrap(
@@ -371,5 +387,4 @@ public class DataFrameAnalyticsTask extends AllocatedPersistentTask implements S
         }
         return StartingState.RESUMING_ANALYZING;
     }
-
 }

+ 64 - 0
x-pack/plugin/ml/src/test/java/org/elasticsearch/xpack/ml/dataframe/DataFrameAnalyticsTaskTests.java

@@ -5,6 +5,7 @@
  */
 package org.elasticsearch.xpack.ml.dataframe;
 
+import org.elasticsearch.Version;
 import org.elasticsearch.action.ActionListener;
 import org.elasticsearch.action.index.IndexAction;
 import org.elasticsearch.action.index.IndexRequest;
@@ -12,16 +13,25 @@ import org.elasticsearch.action.index.IndexResponse;
 import org.elasticsearch.action.search.SearchAction;
 import org.elasticsearch.action.search.SearchResponse;
 import org.elasticsearch.client.Client;
+import org.elasticsearch.cluster.service.ClusterService;
 import org.elasticsearch.common.settings.Settings;
 import org.elasticsearch.common.util.concurrent.ThreadContext;
+import org.elasticsearch.persistent.PersistentTasksService;
+import org.elasticsearch.persistent.UpdatePersistentTaskStatusAction;
 import org.elasticsearch.search.SearchHit;
 import org.elasticsearch.search.SearchHits;
+import org.elasticsearch.tasks.TaskManager;
 import org.elasticsearch.test.ESTestCase;
 import org.elasticsearch.threadpool.ThreadPool;
 import org.elasticsearch.xpack.core.ml.action.GetDataFrameAnalyticsStatsAction;
 import org.elasticsearch.xpack.core.ml.action.GetDataFrameAnalyticsStatsActionResponseTests;
+import org.elasticsearch.xpack.core.ml.action.StartDataFrameAnalyticsAction;
+import org.elasticsearch.xpack.core.ml.dataframe.DataFrameAnalyticsState;
+import org.elasticsearch.xpack.core.ml.dataframe.DataFrameAnalyticsTaskState;
 import org.elasticsearch.xpack.core.ml.utils.PhaseProgress;
 import org.elasticsearch.xpack.ml.dataframe.DataFrameAnalyticsTask.StartingState;
+import org.elasticsearch.xpack.ml.dataframe.stats.ProgressTracker;
+import org.elasticsearch.xpack.ml.notifications.DataFrameAnalyticsAuditor;
 import org.mockito.ArgumentCaptor;
 import org.mockito.InOrder;
 import org.mockito.stubbing.Answer;
@@ -34,9 +44,13 @@ import java.util.Map;
 import static org.hamcrest.Matchers.equalTo;
 import static org.mockito.Matchers.any;
 import static org.mockito.Matchers.eq;
+import static org.mockito.Matchers.same;
+import static org.mockito.Mockito.atLeastOnce;
 import static org.mockito.Mockito.doAnswer;
 import static org.mockito.Mockito.inOrder;
 import static org.mockito.Mockito.mock;
+import static org.mockito.Mockito.verify;
+import static org.mockito.Mockito.verifyNoMoreInteractions;
 import static org.mockito.Mockito.when;
 
 public class DataFrameAnalyticsTaskTests extends ESTestCase {
@@ -156,6 +170,56 @@ public class DataFrameAnalyticsTaskTests extends ESTestCase {
             ".ml-state-dummy");
     }
 
+    public void testSetFailed() {
+        testSetFailed(false);
+    }
+
+    public void testSetFailedDuringNodeShutdown() {
+        testSetFailed(true);
+    }
+
+    private void testSetFailed(boolean nodeShuttingDown) {
+        ThreadPool threadPool = mock(ThreadPool.class);
+        when(threadPool.getThreadContext()).thenReturn(new ThreadContext(Settings.EMPTY));
+        Client client = mock(Client.class);
+        when(client.threadPool()).thenReturn(threadPool);
+        ClusterService clusterService = mock(ClusterService.class);
+        DataFrameAnalyticsManager analyticsManager = mock(DataFrameAnalyticsManager.class);
+        when(analyticsManager.isNodeShuttingDown()).thenReturn(nodeShuttingDown);
+        DataFrameAnalyticsAuditor auditor = mock(DataFrameAnalyticsAuditor.class);
+        PersistentTasksService persistentTasksService = new PersistentTasksService(clusterService, mock(ThreadPool.class), client);
+        TaskManager taskManager = mock(TaskManager.class);
+
+        StartDataFrameAnalyticsAction.TaskParams taskParams =
+            new StartDataFrameAnalyticsAction.TaskParams(
+                "job-id",
+                Version.CURRENT,
+                List.of(
+                    new PhaseProgress(ProgressTracker.REINDEXING, 0),
+                    new PhaseProgress(ProgressTracker.LOADING_DATA, 0),
+                    new PhaseProgress(ProgressTracker.WRITING_RESULTS, 0)),
+                false);
+        DataFrameAnalyticsTask task =
+            new DataFrameAnalyticsTask(
+                123, "type", "action", null, Map.of(), client, clusterService, analyticsManager, auditor, taskParams);
+        task.init(persistentTasksService, taskManager, "task-id", 42);
+        Exception exception = new Exception("some exception");
+
+        task.setFailed(exception);
+
+        verify(analyticsManager).isNodeShuttingDown();
+        verify(client, atLeastOnce()).settings();
+        verify(client, atLeastOnce()).threadPool();
+        if (nodeShuttingDown == false) {
+            verify(client).execute(
+                same(UpdatePersistentTaskStatusAction.INSTANCE),
+                eq(new UpdatePersistentTaskStatusAction.Request(
+                    "task-id", 42, new DataFrameAnalyticsTaskState(DataFrameAnalyticsState.FAILED, 42, "some exception"))),
+                any());
+        }
+        verifyNoMoreInteractions(client, clusterService, analyticsManager, auditor, taskManager);
+    }
+
     @SuppressWarnings("unchecked")
     private static <Response> Answer<Response> withResponse(Response response) {
         return invocationOnMock -> {

+ 33 - 0
x-pack/plugin/ml/src/test/java/org/elasticsearch/xpack/ml/dataframe/process/DataFrameAnalyticsManagerTests.java

@@ -0,0 +1,33 @@
+/*
+ * Copyright Elasticsearch B.V. and/or licensed to Elasticsearch B.V. under one
+ * or more contributor license agreements. Licensed under the Elastic License;
+ * you may not use this file except in compliance with the Elastic License.
+ */
+package org.elasticsearch.xpack.ml.dataframe.process;
+
+import org.elasticsearch.client.node.NodeClient;
+import org.elasticsearch.cluster.metadata.IndexNameExpressionResolver;
+import org.elasticsearch.test.ESTestCase;
+import org.elasticsearch.xpack.ml.dataframe.DataFrameAnalyticsManager;
+import org.elasticsearch.xpack.ml.dataframe.persistence.DataFrameAnalyticsConfigProvider;
+import org.elasticsearch.xpack.ml.notifications.DataFrameAnalyticsAuditor;
+
+import static org.hamcrest.Matchers.is;
+import static org.mockito.Mockito.mock;
+
+public class DataFrameAnalyticsManagerTests extends ESTestCase {
+
+    public void testNodeShuttingDown() {
+        DataFrameAnalyticsManager manager =
+            new DataFrameAnalyticsManager(
+                mock(NodeClient.class),
+                mock(DataFrameAnalyticsConfigProvider.class),
+                mock(AnalyticsProcessManager.class),
+                mock(DataFrameAnalyticsAuditor.class),
+                mock(IndexNameExpressionResolver.class));
+        assertThat(manager.isNodeShuttingDown(), is(false));
+
+        manager.markNodeAsShuttingDown();
+        assertThat(manager.isNodeShuttingDown(), is(true));
+    }
+}