Browse Source

[ML] Audit job open failures and stop any corresponding datafeed (#80665)

The anomaly detection code contained an assumption dating back
to 2016 that if a job failed then its datafeed would notice and
stop itself. That works if the job fails on a node after it has
successfully started up. But it doesn't work if the job fails
during the startup sequence. If the job is being started for the
first time then the datafeed won't be running, so there's no
problem, but if the job fails when it's being reassigned to a
new node then it breaks down, because the datafeed is started
by not assigned to any node at that instant.

This PR addresses this by making the job force-stop its own
datafeed if it fails during its startup sequence and the datafeed
is started.

Fixes #48934

Additionally, auditing of job failures during the startup
sequence is moved so that it happens for all failure scenarios
instead of just one.

Fixes #80621
David Roberts 4 years ago
parent
commit
d95bca8303

+ 102 - 0
x-pack/plugin/ml/src/internalClusterTest/java/org/elasticsearch/xpack/ml/integration/MlDistributedFailureIT.java

@@ -63,6 +63,7 @@ import org.elasticsearch.xpack.core.ml.job.process.autodetect.state.ModelSizeSta
 import org.elasticsearch.xpack.core.ml.job.process.autodetect.state.ModelSnapshot;
 import org.elasticsearch.xpack.core.ml.notifications.NotificationsIndex;
 import org.elasticsearch.xpack.ml.MachineLearning;
+import org.elasticsearch.xpack.ml.job.persistence.JobResultsPersister;
 import org.elasticsearch.xpack.ml.job.process.autodetect.BlackHoleAutodetectProcess;
 import org.elasticsearch.xpack.ml.support.BaseMlIntegTestCase;
 import org.junit.After;
@@ -542,7 +543,108 @@ public class MlDistributedFailureIT extends BaseMlIntegTestCase {
             assertThat(dataCounts.getProcessedRecordCount(), equalTo(numDocs));
             assertThat(dataCounts.getOutOfOrderTimeStampCount(), equalTo(0L));
         });
+    }
+
+    public void testClusterWithTwoMlNodes_StopsDatafeed_GivenJobFailsOnReassign() throws Exception {
+        internalCluster().ensureAtMostNumDataNodes(0);
+        logger.info("Starting dedicated master node...");
+        internalCluster().startMasterOnlyNode();
+        logger.info("Starting ml and data node...");
+        internalCluster().startNode(onlyRoles(Set.of(DiscoveryNodeRole.DATA_ROLE, DiscoveryNodeRole.ML_ROLE)));
+        logger.info("Starting another ml and data node...");
+        internalCluster().startNode(onlyRoles(Set.of(DiscoveryNodeRole.DATA_ROLE, DiscoveryNodeRole.ML_ROLE)));
+        ensureStableCluster();
+
+        // index some datafeed data
+        client().admin().indices().prepareCreate("data").setMapping("time", "type=date").get();
+        long numDocs = 80000;
+        long now = System.currentTimeMillis();
+        long weekAgo = now - 604800000;
+        long twoWeeksAgo = weekAgo - 604800000;
+        indexDocs(logger, "data", numDocs, twoWeeksAgo, weekAgo);
+
+        String jobId = "test-node-goes-down-while-running-job";
+        String datafeedId = jobId + "-datafeed";
+
+        Job.Builder job = createScheduledJob(jobId);
+        PutJobAction.Request putJobRequest = new PutJobAction.Request(job);
+        client().execute(PutJobAction.INSTANCE, putJobRequest).actionGet();
+
+        DatafeedConfig config = createDatafeed(datafeedId, job.getId(), Collections.singletonList("data"), TimeValue.timeValueHours(1));
+        PutDatafeedAction.Request putDatafeedRequest = new PutDatafeedAction.Request(config);
+        client().execute(PutDatafeedAction.INSTANCE, putDatafeedRequest).actionGet();
+
+        client().execute(OpenJobAction.INSTANCE, new OpenJobAction.Request(job.getId()));
+
+        assertBusy(() -> {
+            GetJobsStatsAction.Response statsResponse = client().execute(
+                GetJobsStatsAction.INSTANCE,
+                new GetJobsStatsAction.Request(job.getId())
+            ).actionGet();
+            assertEquals(JobState.OPENED, statsResponse.getResponse().results().get(0).getState());
+        }, 30, TimeUnit.SECONDS);
+
+        DiscoveryNode nodeRunningJob = client().execute(GetJobsStatsAction.INSTANCE, new GetJobsStatsAction.Request(job.getId()))
+            .actionGet()
+            .getResponse()
+            .results()
+            .get(0)
+            .getNode();
+
+        setMlIndicesDelayedNodeLeftTimeoutToZero();
+
+        StartDatafeedAction.Request startDatafeedRequest = new StartDatafeedAction.Request(config.getId(), 0L);
+        client().execute(StartDatafeedAction.INSTANCE, startDatafeedRequest).get();
+
+        waitForJobToHaveProcessedAtLeast(jobId, 1000);
+
+        // The datafeed should be started
+        assertBusy(() -> {
+            GetDatafeedsStatsAction.Response statsResponse = client().execute(
+                GetDatafeedsStatsAction.INSTANCE,
+                new GetDatafeedsStatsAction.Request(config.getId())
+            ).actionGet();
+            assertEquals(DatafeedState.STARTED, statsResponse.getResponse().results().get(0).getDatafeedState());
+        }, 30, TimeUnit.SECONDS);
+
+        // Create a problem that will make the job fail when it restarts on a different node
+        String snapshotId = "123";
+        ModelSnapshot modelSnapshot = new ModelSnapshot.Builder(jobId).setSnapshotId(snapshotId).setTimestamp(new Date()).build();
+        JobResultsPersister jobResultsPersister = internalCluster().getInstance(
+            JobResultsPersister.class,
+            internalCluster().getMasterName()
+        );
+        jobResultsPersister.persistModelSnapshot(modelSnapshot, WriteRequest.RefreshPolicy.IMMEDIATE, () -> true);
+        UpdateJobAction.Request updateJobRequest = UpdateJobAction.Request.internal(
+            jobId,
+            new JobUpdate.Builder(jobId).setModelSnapshotId(snapshotId).build()
+        );
+        client().execute(UpdateJobAction.INSTANCE, updateJobRequest).actionGet();
+        refresh(AnomalyDetectorsIndex.resultsWriteAlias(jobId));
+
+        // Make the job move to a different node
+        internalCluster().stopNode(nodeRunningJob.getName());
+
+        // Wait for the job to fail during reassignment
+        assertBusy(() -> {
+            GetJobsStatsAction.Response statsResponse = client().execute(
+                GetJobsStatsAction.INSTANCE,
+                new GetJobsStatsAction.Request(job.getId())
+            ).actionGet();
+            assertEquals(JobState.FAILED, statsResponse.getResponse().results().get(0).getState());
+        }, 30, TimeUnit.SECONDS);
+
+        // The datafeed should then be stopped
+        assertBusy(() -> {
+            GetDatafeedsStatsAction.Response statsResponse = client().execute(
+                GetDatafeedsStatsAction.INSTANCE,
+                new GetDatafeedsStatsAction.Request(config.getId())
+            ).actionGet();
+            assertEquals(DatafeedState.STOPPED, statsResponse.getResponse().results().get(0).getDatafeedState());
+        }, 30, TimeUnit.SECONDS);
 
+        // Force close the failed job to clean up
+        client().execute(CloseJobAction.INSTANCE, new CloseJobAction.Request(jobId).setForce(true)).actionGet();
     }
 
     private void setupJobWithoutDatafeed(String jobId, ByteSizeValue modelMemoryLimit) throws Exception {

+ 1 - 1
x-pack/plugin/ml/src/main/java/org/elasticsearch/xpack/ml/datafeed/DatafeedNodeSelector.java

@@ -145,7 +145,7 @@ public class DatafeedNodeSelector {
         }
 
         if (jobState.isAnyOf(JobState.OPENING, JobState.OPENED) == false) {
-            // lets try again later when the job has been opened:
+            // let's try again later when the job has been opened:
             String reason = "cannot start datafeed ["
                 + datafeedId
                 + "], because the job's ["

+ 7 - 15
x-pack/plugin/ml/src/main/java/org/elasticsearch/xpack/ml/job/process/autodetect/AutodetectProcessManager.java

@@ -404,13 +404,7 @@ public class AutodetectProcessManager implements ClusterStateListener {
         // Step 3. Set scheduled events on message and write update process message
         ActionListener<QueryPage<ScheduledEvent>> eventsListener = ActionListener.wrap(events -> {
             updateProcessMessage.setScheduledEvents(events == null ? null : events.results());
-            communicator.writeUpdateProcessMessage(updateProcessMessage.build(), (aVoid, e) -> {
-                if (e == null) {
-                    handler.accept(null);
-                } else {
-                    handler.accept(e);
-                }
-            });
+            communicator.writeUpdateProcessMessage(updateProcessMessage.build(), (aVoid, e) -> handler.accept(e));
         }, handler);
 
         // Step 2. Set the filters on the message and get scheduled events
@@ -545,20 +539,18 @@ public class AutodetectProcessManager implements ClusterStateListener {
 
         // Start the process
         ActionListener<Boolean> stateAliasHandler = ActionListener.wrap(
-            r -> {
-                jobManager.getJob(
-                    jobId,
-                    ActionListener.wrap(job -> startProcess(jobTask, job, closeHandler), e -> closeHandler.accept(e, true))
-                );
-            },
+            r -> jobManager.getJob(
+                jobId,
+                ActionListener.wrap(job -> startProcess(jobTask, job, closeHandler), e -> closeHandler.accept(e, true))
+            ),
             e -> {
                 if (ExceptionsHelper.unwrapCause(e) instanceof InvalidAliasNameException) {
                     String msg = "Detected a problem with your setup of machine learning, the state index alias ["
                         + AnomalyDetectorsIndex.jobStateIndexWriteAlias()
                         + "] exists as index but must be an alias.";
                     logger.error(new ParameterizedMessage("[{}] {}", jobId, msg), e);
-                    auditor.error(jobId, msg);
-                    setJobState(jobTask, JobState.FAILED, msg, e2 -> closeHandler.accept(e, true));
+                    // The close handler is responsible for auditing this and setting the job state to failed
+                    closeHandler.accept(new IllegalStateException(msg, e), true);
                 } else {
                     closeHandler.accept(e, true);
                 }

+ 63 - 23
x-pack/plugin/ml/src/main/java/org/elasticsearch/xpack/ml/job/task/OpenJobPersistentTasksExecutor.java

@@ -39,6 +39,7 @@ import org.elasticsearch.xpack.core.ml.action.GetModelSnapshotsAction;
 import org.elasticsearch.xpack.core.ml.action.OpenJobAction;
 import org.elasticsearch.xpack.core.ml.action.ResetJobAction;
 import org.elasticsearch.xpack.core.ml.action.RevertModelSnapshotAction;
+import org.elasticsearch.xpack.core.ml.action.StopDatafeedAction;
 import org.elasticsearch.xpack.core.ml.job.config.Blocked;
 import org.elasticsearch.xpack.core.ml.job.config.Job;
 import org.elasticsearch.xpack.core.ml.job.config.JobState;
@@ -298,8 +299,8 @@ public class OpenJobPersistentTasksExecutor extends AbstractJobPersistentTasksEx
             return;
         }
 
-        ActionListener<Boolean> hasRunningDatafeedTaskListener = ActionListener.wrap(hasRunningDatafeed -> {
-            if (hasRunningDatafeed && isMasterNodeVersionOnOrAfter(MIN_MASTER_NODE_VERSION_FOR_REVERTING_TO_CURRENT_SNAPSHOT)) {
+        ActionListener<String> getRunningDatafeedListener = ActionListener.wrap(runningDatafeedId -> {
+            if (runningDatafeedId != null && isMasterNodeVersionOnOrAfter(MIN_MASTER_NODE_VERSION_FOR_REVERTING_TO_CURRENT_SNAPSHOT)) {
 
                 // This job has a running datafeed attached to it.
                 // In order to prevent gaps in the model we revert to the current snapshot deleting intervening results.
@@ -319,45 +320,84 @@ public class OpenJobPersistentTasksExecutor extends AbstractJobPersistentTasksEx
             }
         });
 
-        hasRunningDatafeedTask(jobTask.getJobId(), hasRunningDatafeedTaskListener);
+        getRunningDatafeed(jobTask.getJobId(), getRunningDatafeedListener);
     }
 
     private void failTask(JobTask jobTask, String reason) {
+        String jobId = jobTask.getJobId();
+        auditor.error(jobId, reason);
         JobTaskState failedState = new JobTaskState(JobState.FAILED, jobTask.getAllocationId(), reason);
-        jobTask.updatePersistentTaskState(
-            failedState,
-            ActionListener.wrap(
-                r -> logger.debug(() -> new ParameterizedMessage("[{}] updated task state to failed", jobTask.getJobId())),
-                e -> {
-                    logger.error(
-                        new ParameterizedMessage(
-                            "[{}] error while setting task state to failed; marking task as failed",
-                            jobTask.getJobId()
-                        ),
-                        e
-                    );
-                    jobTask.markAsFailed(e);
-                }
-            )
-        );
+        jobTask.updatePersistentTaskState(failedState, ActionListener.wrap(r -> {
+            logger.debug("[{}] updated task state to failed", jobId);
+            stopAssociatedDatafeedForFailedJob(jobId);
+        }, e -> {
+            logger.error(new ParameterizedMessage("[{}] error while setting task state to failed; marking task as failed", jobId), e);
+            jobTask.markAsFailed(e);
+            stopAssociatedDatafeedForFailedJob(jobId);
+        }));
+    }
+
+    private void stopAssociatedDatafeedForFailedJob(String jobId) {
+
+        if (autodetectProcessManager.isNodeDying()) {
+            // The node shutdown caught us at a bad time, and we cannot stop the datafeed
+            return;
+        }
+
+        ActionListener<String> getRunningDatafeedListener = ActionListener.wrap(runningDatafeedId -> {
+            if (runningDatafeedId == null) {
+                return;
+            }
+            StopDatafeedAction.Request request = new StopDatafeedAction.Request(runningDatafeedId);
+            request.setForce(true);
+            executeAsyncWithOrigin(
+                client,
+                ML_ORIGIN,
+                StopDatafeedAction.INSTANCE,
+                request,
+                ActionListener.wrap(
+                    // StopDatafeedAction will audit the stopping of the datafeed if it succeeds so we don't need to do that here
+                    r -> logger.info("[{}] stopped associated datafeed [{}] after job failure", jobId, runningDatafeedId),
+                    e -> {
+                        if (autodetectProcessManager.isNodeDying() == false) {
+                            logger.error(
+                                new ParameterizedMessage(
+                                    "[{}] failed to stop associated datafeed [{}] after job failure",
+                                    jobId,
+                                    runningDatafeedId
+                                ),
+                                e
+                            );
+                            auditor.error(jobId, "failed to stop associated datafeed after job failure");
+                        }
+                    }
+                )
+            );
+        }, e -> {
+            if (autodetectProcessManager.isNodeDying() == false) {
+                logger.error(new ParameterizedMessage("[{}] failed to search for associated datafeed", jobId), e);
+            }
+        });
+
+        getRunningDatafeed(jobId, getRunningDatafeedListener);
     }
 
     private boolean isMasterNodeVersionOnOrAfter(Version version) {
         return clusterState.nodes().getMasterNode().getVersion().onOrAfter(version);
     }
 
-    private void hasRunningDatafeedTask(String jobId, ActionListener<Boolean> listener) {
+    private void getRunningDatafeed(String jobId, ActionListener<String> listener) {
         ActionListener<Set<String>> datafeedListener = ActionListener.wrap(datafeeds -> {
             assert datafeeds.size() <= 1;
             if (datafeeds.isEmpty()) {
-                listener.onResponse(false);
+                listener.onResponse(null);
                 return;
             }
 
             String datafeedId = datafeeds.iterator().next();
             PersistentTasksCustomMetadata tasks = clusterState.getMetadata().custom(PersistentTasksCustomMetadata.TYPE);
             PersistentTasksCustomMetadata.PersistentTask<?> datafeedTask = MlTasks.getDatafeedTask(datafeedId, tasks);
-            listener.onResponse(datafeedTask != null);
+            listener.onResponse(datafeedTask != null ? datafeedId : null);
         }, listener::onFailure);
 
         datafeedConfigProvider.findDatafeedIdsForJobIds(Collections.singleton(jobId), datafeedListener);
@@ -504,7 +544,7 @@ public class OpenJobPersistentTasksExecutor extends AbstractJobPersistentTasksEx
                 }
             } else if (autodetectProcessManager.isNodeDying() == false) {
                 logger.error(new ParameterizedMessage("[{}] failed to open job", jobTask.getJobId()), e2);
-                failTask(jobTask, "failed to open job");
+                failTask(jobTask, "failed to open job: " + e2.getMessage());
             }
         });
     }