Browse Source

[ML] Add reason to DataFrameAnalyticsTask setFailed log message (#52659)

David Kyle 5 years ago
parent
commit
28582d80df

+ 2 - 1
x-pack/plugin/core/src/main/java/org/elasticsearch/xpack/core/ml/job/messages/Messages.java

@@ -60,7 +60,8 @@ public final class Messages {
     public static final String DATA_FRAME_ANALYTICS_AUDIT_STARTED = "Started analytics";
     public static final String DATA_FRAME_ANALYTICS_AUDIT_STOPPED = "Stopped analytics";
     public static final String DATA_FRAME_ANALYTICS_AUDIT_DELETED = "Deleted analytics";
-    public static final String DATA_FRAME_ANALYTICS_AUDIT_UPDATED_STATE = "Successfully updated analytics task state to [{0}]";
+    public static final String DATA_FRAME_ANALYTICS_AUDIT_UPDATED_STATE_WITH_REASON =
+            "Updated analytics task state to [{0}] with reason [{1}]";
     public static final String DATA_FRAME_ANALYTICS_AUDIT_ESTIMATED_MEMORY_USAGE = "Estimated memory usage for this analytics to be [{0}]";
     public static final String DATA_FRAME_ANALYTICS_AUDIT_CREATING_DEST_INDEX = "Creating destination index [{0}]";
     public static final String DATA_FRAME_ANALYTICS_AUDIT_REUSING_DEST_INDEX = "Using existing destination index [{0}]";

+ 9 - 9
x-pack/plugin/ml/src/main/java/org/elasticsearch/xpack/ml/dataframe/DataFrameAnalyticsManager.java

@@ -93,12 +93,12 @@ public class DataFrameAnalyticsManager {
                         executeJobInMiddleOfReindexing(task, config);
                         break;
                     default:
-                        task.updateState(DataFrameAnalyticsState.FAILED, "Cannot execute analytics task [" + config.getId() +
+                        task.setFailed("Cannot execute analytics task [" + config.getId() +
                             "] as it is in unknown state [" + currentState + "]. Must be one of [STARTED, REINDEXING, ANALYZING]");
                 }
 
             },
-            error -> task.updateState(DataFrameAnalyticsState.FAILED, error.getMessage())
+            error -> task.setFailed(error.getMessage())
         );
 
         // Retrieve configuration
@@ -122,13 +122,13 @@ public class DataFrameAnalyticsManager {
             case FIRST_TIME:
                 task.updatePersistentTaskState(reindexingState, ActionListener.wrap(
                     updatedTask -> reindexDataframeAndStartAnalysis(task, config),
-                    error -> task.updateState(DataFrameAnalyticsState.FAILED, error.getMessage())
+                    error -> task.setFailed(error.getMessage())
                 ));
                 break;
             case RESUMING_REINDEXING:
                 task.updatePersistentTaskState(reindexingState, ActionListener.wrap(
                     updatedTask -> executeJobInMiddleOfReindexing(task, config),
-                    error -> task.updateState(DataFrameAnalyticsState.FAILED, error.getMessage())
+                    error -> task.setFailed(error.getMessage())
                 ));
                 break;
             case RESUMING_ANALYZING:
@@ -136,7 +136,7 @@ public class DataFrameAnalyticsManager {
                 break;
             case FINISHED:
             default:
-                task.updateState(DataFrameAnalyticsState.FAILED, "Unexpected starting state [" + startingState + "]");
+                task.setFailed("Unexpected starting state [" + startingState + "]");
         }
     }
 
@@ -151,7 +151,7 @@ public class DataFrameAnalyticsManager {
                     if (ExceptionsHelper.unwrapCause(e) instanceof IndexNotFoundException) {
                         reindexDataframeAndStartAnalysis(task, config);
                     } else {
-                        task.updateState(DataFrameAnalyticsState.FAILED, e.getMessage());
+                        task.setFailed(e.getMessage());
                     }
                 }
             ));
@@ -178,7 +178,7 @@ public class DataFrameAnalyticsManager {
                     Messages.getMessage(Messages.DATA_FRAME_ANALYTICS_AUDIT_FINISHED_REINDEXING, config.getDest().getIndex()));
                 startAnalytics(task, config);
             },
-            error -> task.updateState(DataFrameAnalyticsState.FAILED, error.getMessage())
+            error -> task.setFailed(error.getMessage())
         );
 
         // Reindex
@@ -244,12 +244,12 @@ public class DataFrameAnalyticsManager {
                         if (ExceptionsHelper.unwrapCause(error) instanceof ResourceNotFoundException) {
                             // Task has stopped
                         } else {
-                            task.updateState(DataFrameAnalyticsState.FAILED, error.getMessage());
+                            task.setFailed(error.getMessage());
                         }
                     }
                 ));
             },
-            error -> task.updateState(DataFrameAnalyticsState.FAILED, error.getMessage())
+            error -> task.setFailed(error.getMessage())
         );
 
         ActionListener<RefreshResponse> refreshListener = ActionListener.wrap(

+ 8 - 5
x-pack/plugin/ml/src/main/java/org/elasticsearch/xpack/ml/dataframe/DataFrameAnalyticsTask.java

@@ -177,17 +177,20 @@ public class DataFrameAnalyticsTask extends AllocatedPersistentTask implements S
         }
     }
 
-    public void updateState(DataFrameAnalyticsState state, @Nullable String reason) {
-        DataFrameAnalyticsTaskState newTaskState = new DataFrameAnalyticsTaskState(state, getAllocationId(), reason);
+    public void setFailed(String reason) {
+        DataFrameAnalyticsTaskState newTaskState = new DataFrameAnalyticsTaskState(DataFrameAnalyticsState.FAILED,
+                getAllocationId(), reason);
         updatePersistentTaskState(
             newTaskState,
             ActionListener.wrap(
                 updatedTask -> {
-                    auditor.info(getParams().getId(), Messages.getMessage(Messages.DATA_FRAME_ANALYTICS_AUDIT_UPDATED_STATE, state));
-                    LOGGER.info("[{}] Successfully update task state to [{}]", getParams().getId(), state);
+                    String message = Messages.getMessage(Messages.DATA_FRAME_ANALYTICS_AUDIT_UPDATED_STATE_WITH_REASON,
+                            DataFrameAnalyticsState.FAILED, reason);
+                    auditor.info(getParams().getId(), message);
+                    LOGGER.info("[{}] {}", getParams().getId(), message);
                 },
                 e -> LOGGER.error(new ParameterizedMessage("[{}] Could not update task state to [{}] with reason [{}]",
-                    getParams().getId(), state, reason), e)
+                    getParams().getId(), DataFrameAnalyticsState.FAILED, reason), e)
             )
         );
     }

+ 3 - 5
x-pack/plugin/ml/src/main/java/org/elasticsearch/xpack/ml/dataframe/process/AnalyticsProcessManager.java

@@ -23,7 +23,6 @@ import org.elasticsearch.search.SearchHit;
 import org.elasticsearch.threadpool.ThreadPool;
 import org.elasticsearch.xpack.core.ClientHelper;
 import org.elasticsearch.xpack.core.ml.dataframe.DataFrameAnalyticsConfig;
-import org.elasticsearch.xpack.core.ml.dataframe.DataFrameAnalyticsState;
 import org.elasticsearch.xpack.core.ml.dataframe.analyses.DataFrameAnalysis;
 import org.elasticsearch.xpack.core.ml.job.messages.Messages;
 import org.elasticsearch.xpack.core.ml.job.persistence.AnomalyDetectorsIndex;
@@ -110,8 +109,7 @@ public class AnalyticsProcessManager {
                     return;
                 }
                 if (processContextByAllocation.putIfAbsent(task.getAllocationId(), processContext) != null) {
-                    task.updateState(
-                        DataFrameAnalyticsState.FAILED, "[" + config.getId() + "] Could not create process as one already exists");
+                    task.setFailed("[" + config.getId() + "] Could not create process as one already exists");
                     return;
                 }
             }
@@ -193,7 +191,7 @@ public class AnalyticsProcessManager {
                 task.markAsCompleted();
             } else {
                 LOGGER.error("[{}] Marking task failed; {}", config.getId(), processContext.getFailureReason());
-                task.updateState(DataFrameAnalyticsState.FAILED, processContext.getFailureReason());
+                task.setFailed(processContext.getFailureReason());
                 // Note: We are not marking the task as failed here as we want the user to be able to inspect the failure reason.
             }
         }
@@ -265,7 +263,7 @@ public class AnalyticsProcessManager {
             process.restoreState(state);
         } catch (Exception e) {
             LOGGER.error(new ParameterizedMessage("[{}] Failed to restore state", process.getConfig().jobId()), e);
-            task.updateState(DataFrameAnalyticsState.FAILED, "Failed to restore state: " + e.getMessage());
+            task.setFailed("Failed to restore state: " + e.getMessage());
         }
     }
 

+ 1 - 2
x-pack/plugin/ml/src/test/java/org/elasticsearch/xpack/ml/dataframe/process/AnalyticsProcessManagerTests.java

@@ -14,7 +14,6 @@ import org.elasticsearch.test.ESTestCase;
 import org.elasticsearch.threadpool.ThreadPool;
 import org.elasticsearch.xpack.core.ml.dataframe.DataFrameAnalyticsConfig;
 import org.elasticsearch.xpack.core.ml.dataframe.DataFrameAnalyticsConfigTests;
-import org.elasticsearch.xpack.core.ml.dataframe.DataFrameAnalyticsState;
 import org.elasticsearch.xpack.core.ml.dataframe.analyses.OutlierDetectionTests;
 import org.elasticsearch.xpack.ml.dataframe.DataFrameAnalyticsTask;
 import org.elasticsearch.xpack.ml.dataframe.extractor.DataFrameDataExtractor;
@@ -131,7 +130,7 @@ public class AnalyticsProcessManagerTests extends ESTestCase {
         inOrder.verify(task).getStatsHolder();
         inOrder.verify(task).isStopping();
         inOrder.verify(task).getAllocationId();
-        inOrder.verify(task).updateState(DataFrameAnalyticsState.FAILED, "[config-id] Could not create process as one already exists");
+        inOrder.verify(task).setFailed("[config-id] Could not create process as one already exists");
         verifyNoMoreInteractions(task);
     }