浏览代码

[ML][Transforms] adjusting when and what to audit (#45876)

* [ML][Transforms] adjusting when and what to audit

* Update DataFrameTransformTask.java

* removing unnecessary audit message
Benjamin Trent 6 年之前
父节点
当前提交
153ea9451d

+ 21 - 11
x-pack/plugin/data-frame/src/main/java/org/elasticsearch/xpack/dataframe/transforms/DataFrameTransformTask.java

@@ -306,6 +306,8 @@ public class DataFrameTransformTask extends AllocatedPersistentTask implements S
                 listener.onResponse(new StartDataFrameTransformTaskAction.Response(true));
             },
             exc -> {
+                auditor.warning(transform.getId(),
+                    "Failed to persist to cluster state while marking task as started. Failure: " + exc.getMessage());
                 logger.error(new ParameterizedMessage("[{}] failed updating state to [{}].", getTransformId(), state), exc);
                 getIndexer().stop();
                 listener.onFailure(new ElasticsearchException("Error while updating state for data frame transform ["
@@ -412,7 +414,6 @@ public class DataFrameTransformTask extends AllocatedPersistentTask implements S
                 listener.onResponse(success);
             },
             failure -> {
-                auditor.warning(transform.getId(), "Failed to persist to state to cluster state: " + failure.getMessage());
                 logger.error(new ParameterizedMessage("[{}] failed to update cluster state for data frame transform.",
                     transform.getId()),
                     failure);
@@ -434,7 +435,6 @@ public class DataFrameTransformTask extends AllocatedPersistentTask implements S
         // it is probably best to NOT change the internal state of the task and allow the normal stopping logic to continue.
         if (getIndexer() != null && getIndexer().getState() == IndexerState.STOPPING) {
             logger.info("[{}] attempt to fail transform with reason [{}] while it was stopping.", getTransformId(), reason);
-            auditor.info(getTransformId(), "Attempted to fail transform with reason [" + reason + "] while in STOPPING state.");
             listener.onResponse(null);
             return;
         }
@@ -459,7 +459,10 @@ public class DataFrameTransformTask extends AllocatedPersistentTask implements S
         persistStateToClusterState(newState, ActionListener.wrap(
             r -> listener.onResponse(null),
             e -> {
-                logger.error(new ParameterizedMessage("[{}] failed to set task state as failed to cluster state.", getTransformId()),
+                String msg = "Failed to persist to cluster state while marking task as failed with reason [" + reason + "].";
+                auditor.warning(transform.getId(),
+                    msg + " Failure: " + e.getMessage());
+                logger.error(new ParameterizedMessage("[{}] {}", getTransformId(), msg),
                     e);
                 listener.onFailure(e);
             }
@@ -945,12 +948,6 @@ public class DataFrameTransformTask extends AllocatedPersistentTask implements S
         protected void onFailure(Exception exc) {
             // the failure handler must not throw an exception due to internal problems
             try {
-                // Since our schedule fires again very quickly after failures it is possible to run into the same failure numerous
-                // times in a row, very quickly. We do not want to spam the audit log with repeated failures, so only record the first one
-                if (exc.getMessage().equals(lastAuditedExceptionMessage) == false) {
-                    auditor.warning(transformTask.getTransformId(), "Data frame transform encountered an exception: " + exc.getMessage());
-                    lastAuditedExceptionMessage = exc.getMessage();
-                }
                 handleFailure(exc);
             } catch (Exception e) {
                 logger.error(
@@ -1052,13 +1049,17 @@ public class DataFrameTransformTask extends AllocatedPersistentTask implements S
                             createCheckpointException -> {
                                 logger.warn(new ParameterizedMessage("[{}] failed to create checkpoint.", transformId),
                                     createCheckpointException);
-                                listener.onFailure(new RuntimeException("Failed to create checkpoint", createCheckpointException));
+                                listener.onFailure(
+                                    new RuntimeException("Failed to create checkpoint due to " + createCheckpointException.getMessage(),
+                                        createCheckpointException));
                             }
                     )),
                     getCheckPointException -> {
                         logger.warn(new ParameterizedMessage("[{}] failed to retrieve checkpoint.", transformId),
                             getCheckPointException);
-                        listener.onFailure(new RuntimeException("Failed to retrieve checkpoint", getCheckPointException));
+                        listener.onFailure(
+                            new RuntimeException("Failed to retrieve checkpoint due to " + getCheckPointException.getMessage(),
+                                getCheckPointException));
                     }
             ));
         }
@@ -1103,6 +1104,15 @@ public class DataFrameTransformTask extends AllocatedPersistentTask implements S
                     "task encountered irrecoverable failure: " + e.getMessage() :
                     "task encountered more than " + transformTask.getNumFailureRetries() + " failures; latest failure: " + e.getMessage();
                 failIndexer(failureMessage);
+            } else {
+                // Since our schedule fires again very quickly after failures it is possible to run into the same failure numerous
+                // times in a row, very quickly. We do not want to spam the audit log with repeated failures, so only record the first one
+                if (e.getMessage().equals(lastAuditedExceptionMessage) == false) {
+                    auditor.warning(transformTask.getTransformId(),
+                        "Data frame transform encountered an exception: " + e.getMessage() +
+                            " Will attempt again at next scheduled trigger.");
+                    lastAuditedExceptionMessage = e.getMessage();
+                }
             }
         }