Bläddra i källkod

[ML] Improve DF analytics audits and logging (#53179)

Adds audits for when the job starts reindexing, loading data,
analyzing, writing results. Also adds some info logging.
Dimitris Athanasiou 5 år sedan
förälder
incheckning
51dfe7fd25

+ 6 - 1
x-pack/plugin/core/src/main/java/org/elasticsearch/xpack/core/ml/job/messages/Messages.java

@@ -65,9 +65,14 @@ public final class Messages {
     public static final String DATA_FRAME_ANALYTICS_AUDIT_ESTIMATED_MEMORY_USAGE = "Estimated memory usage for this analytics to be [{0}]";
     public static final String DATA_FRAME_ANALYTICS_AUDIT_CREATING_DEST_INDEX = "Creating destination index [{0}]";
     public static final String DATA_FRAME_ANALYTICS_AUDIT_REUSING_DEST_INDEX = "Using existing destination index [{0}]";
-    public static final String DATA_FRAME_ANALYTICS_AUDIT_FINISHED_REINDEXING = "Finished reindexing to destination index [{0}]";
+    public static final String DATA_FRAME_ANALYTICS_AUDIT_STARTED_REINDEXING = "Started reindexing to destination index [{0}]";
+    public static final String DATA_FRAME_ANALYTICS_AUDIT_FINISHED_REINDEXING =
+        "Finished reindexing to destination index [{0}], took [{1}]";
     public static final String DATA_FRAME_ANALYTICS_AUDIT_FINISHED_ANALYSIS = "Finished analysis";
     public static final String DATA_FRAME_ANALYTICS_AUDIT_RESTORING_STATE = "Restoring from previous model state";
+    public static final String DATA_FRAME_ANALYTICS_AUDIT_STARTED_LOADING_DATA = "Started loading data";
+    public static final String DATA_FRAME_ANALYTICS_AUDIT_STARTED_ANALYZING = "Started analyzing";
+    public static final String DATA_FRAME_ANALYTICS_AUDIT_STARTED_WRITING_RESULTS = "Started writing results";
 
     public static final String FILTER_CANNOT_DELETE = "Cannot delete filter [{0}] currently used by jobs {1}";
     public static final String FILTER_CONTAINS_TOO_MANY_ITEMS = "Filter [{0}] contains too many items; up to [{1}] items are allowed";

+ 12 - 0
x-pack/plugin/ml/qa/native-multi-node-tests/src/test/java/org/elasticsearch/xpack/ml/integration/ClassificationIT.java

@@ -119,7 +119,11 @@ public class ClassificationIT extends MlNativeDataFrameAnalyticsIntegTestCase {
             "Starting analytics on node",
             "Started analytics",
             expectedDestIndexAuditMessage(),
+            "Started reindexing to destination index [" + destIndex + "]",
             "Finished reindexing to destination index [" + destIndex + "]",
+            "Started loading data",
+            "Started analyzing",
+            "Started writing results",
             "Finished analysis");
         assertEvaluation(KEYWORD_FIELD, KEYWORD_FIELD_VALUES, "ml." + predictedClassField);
     }
@@ -160,7 +164,11 @@ public class ClassificationIT extends MlNativeDataFrameAnalyticsIntegTestCase {
             "Starting analytics on node",
             "Started analytics",
             expectedDestIndexAuditMessage(),
+            "Started reindexing to destination index [" + destIndex + "]",
             "Finished reindexing to destination index [" + destIndex + "]",
+            "Started loading data",
+            "Started analyzing",
+            "Started writing results",
             "Finished analysis");
         assertEvaluation(KEYWORD_FIELD, KEYWORD_FIELD_VALUES, "ml." + predictedClassField);
     }
@@ -223,7 +231,11 @@ public class ClassificationIT extends MlNativeDataFrameAnalyticsIntegTestCase {
             "Starting analytics on node",
             "Started analytics",
             expectedDestIndexAuditMessage(),
+            "Started reindexing to destination index [" + destIndex + "]",
             "Finished reindexing to destination index [" + destIndex + "]",
+            "Started loading data",
+            "Started analyzing",
+            "Started writing results",
             "Finished analysis");
         assertEvaluation(dependentVariable, dependentVariableValues, "ml." + predictedClassField);
     }

+ 2 - 0
x-pack/plugin/ml/qa/native-multi-node-tests/src/test/java/org/elasticsearch/xpack/ml/integration/MlNativeDataFrameAnalyticsIntegTestCase.java

@@ -233,6 +233,7 @@ abstract class MlNativeDataFrameAnalyticsIntegTestCase extends MlNativeIntegTest
         // Since calls to write the AbstractAuditor are sent and forgot (async) we could have returned from the start,
         // finished the job (as this is a very short analytics job), all without the audit being fully written.
         assertBusy(() -> assertTrue(indexExists(NotificationsIndex.NOTIFICATIONS_INDEX)));
+
         @SuppressWarnings("unchecked")
         Matcher<String>[] itemMatchers = Arrays.stream(expectedAuditMessagePrefixes).map(Matchers::startsWith).toArray(Matcher[]::new);
         assertBusy(() -> {
@@ -252,6 +253,7 @@ abstract class MlNativeDataFrameAnalyticsIntegTestCase extends MlNativeIntegTest
             .setIndices(NotificationsIndex.NOTIFICATIONS_INDEX)
             .addSort("timestamp", SortOrder.ASC)
             .setQuery(QueryBuilders.termQuery("job_id", dataFrameAnalyticsId))
+            .setSize(100)
             .request();
         SearchResponse searchResponse = client().execute(SearchAction.INSTANCE, searchRequest).actionGet();
 

+ 12 - 0
x-pack/plugin/ml/qa/native-multi-node-tests/src/test/java/org/elasticsearch/xpack/ml/integration/RegressionIT.java

@@ -103,7 +103,11 @@ public class RegressionIT extends MlNativeDataFrameAnalyticsIntegTestCase {
             "Starting analytics on node",
             "Started analytics",
             "Creating destination index [" + destIndex + "]",
+            "Started reindexing to destination index [" + destIndex + "]",
             "Finished reindexing to destination index [" + destIndex + "]",
+            "Started loading data",
+            "Started analyzing",
+            "Started writing results",
             "Finished analysis");
     }
 
@@ -142,7 +146,11 @@ public class RegressionIT extends MlNativeDataFrameAnalyticsIntegTestCase {
             "Starting analytics on node",
             "Started analytics",
             "Creating destination index [" + destIndex + "]",
+            "Started reindexing to destination index [" + destIndex + "]",
             "Finished reindexing to destination index [" + destIndex + "]",
+            "Started loading data",
+            "Started analyzing",
+            "Started writing results",
             "Finished analysis");
     }
 
@@ -196,7 +204,11 @@ public class RegressionIT extends MlNativeDataFrameAnalyticsIntegTestCase {
             "Starting analytics on node",
             "Started analytics",
             "Creating destination index [" + destIndex + "]",
+            "Started reindexing to destination index [" + destIndex + "]",
             "Finished reindexing to destination index [" + destIndex + "]",
+            "Started loading data",
+            "Started analyzing",
+            "Started writing results",
             "Finished analysis");
     }
 

+ 24 - 0
x-pack/plugin/ml/qa/native-multi-node-tests/src/test/java/org/elasticsearch/xpack/ml/integration/RunDataFrameAnalyticsIT.java

@@ -126,7 +126,11 @@ public class RunDataFrameAnalyticsIT extends MlNativeDataFrameAnalyticsIntegTest
             "Starting analytics on node",
             "Started analytics",
             "Creating destination index [test-outlier-detection-with-few-docs-results]",
+            "Started reindexing to destination index [test-outlier-detection-with-few-docs-results]",
             "Finished reindexing to destination index [test-outlier-detection-with-few-docs-results]",
+            "Started loading data",
+            "Started analyzing",
+            "Started writing results",
             "Finished analysis");
     }
 
@@ -181,7 +185,11 @@ public class RunDataFrameAnalyticsIT extends MlNativeDataFrameAnalyticsIntegTest
             "Starting analytics on node",
             "Started analytics",
             "Creating destination index [test-outlier-detection-with-enough-docs-to-scroll-results]",
+            "Started reindexing to destination index [test-outlier-detection-with-enough-docs-to-scroll-results]",
             "Finished reindexing to destination index [test-outlier-detection-with-enough-docs-to-scroll-results]",
+            "Started loading data",
+            "Started analyzing",
+            "Started writing results",
             "Finished analysis");
     }
 
@@ -262,7 +270,11 @@ public class RunDataFrameAnalyticsIT extends MlNativeDataFrameAnalyticsIntegTest
             "Starting analytics on node",
             "Started analytics",
             "Creating destination index [test-outlier-detection-with-more-fields-than-docvalue-limit-results]",
+            "Started reindexing to destination index [test-outlier-detection-with-more-fields-than-docvalue-limit-results]",
             "Finished reindexing to destination index [test-outlier-detection-with-more-fields-than-docvalue-limit-results]",
+            "Started loading data",
+            "Started analyzing",
+            "Started writing results",
             "Finished analysis");
     }
 
@@ -387,7 +399,11 @@ public class RunDataFrameAnalyticsIT extends MlNativeDataFrameAnalyticsIntegTest
             "Starting analytics on node",
             "Started analytics",
             "Creating destination index [test-outlier-detection-with-multiple-source-indices-results]",
+            "Started reindexing to destination index [test-outlier-detection-with-multiple-source-indices-results]",
             "Finished reindexing to destination index [test-outlier-detection-with-multiple-source-indices-results]",
+            "Started loading data",
+            "Started analyzing",
+            "Started writing results",
             "Finished analysis");
     }
 
@@ -445,7 +461,11 @@ public class RunDataFrameAnalyticsIT extends MlNativeDataFrameAnalyticsIntegTest
             "Starting analytics on node",
             "Started analytics",
             "Using existing destination index [test-outlier-detection-with-pre-existing-dest-index-results]",
+            "Started reindexing to destination index [test-outlier-detection-with-pre-existing-dest-index-results]",
             "Finished reindexing to destination index [test-outlier-detection-with-pre-existing-dest-index-results]",
+            "Started loading data",
+            "Started analyzing",
+            "Started writing results",
             "Finished analysis");
     }
 
@@ -699,7 +719,11 @@ public class RunDataFrameAnalyticsIT extends MlNativeDataFrameAnalyticsIntegTest
             "Starting analytics on node",
             "Started analytics",
             "Creating destination index [test-outlier-detection-with-custom-params-results]",
+            "Started reindexing to destination index [test-outlier-detection-with-custom-params-results]",
             "Finished reindexing to destination index [test-outlier-detection-with-custom-params-results]",
+            "Started loading data",
+            "Started analyzing",
+            "Started writing results",
             "Finished analysis");
     }
 }

+ 5 - 1
x-pack/plugin/ml/src/main/java/org/elasticsearch/xpack/ml/dataframe/DataFrameAnalyticsManager.java

@@ -213,7 +213,8 @@ public class DataFrameAnalyticsManager {
                 task.setReindexingFinished();
                 auditor.info(
                     config.getId(),
-                    Messages.getMessage(Messages.DATA_FRAME_ANALYTICS_AUDIT_FINISHED_REINDEXING, config.getDest().getIndex()));
+                    Messages.getMessage(Messages.DATA_FRAME_ANALYTICS_AUDIT_FINISHED_REINDEXING, config.getDest().getIndex(),
+                        reindexResponse.getTook()));
                 startAnalytics(task, config);
             },
             error -> task.setFailed(ExceptionsHelper.unwrapCause(error).getMessage())
@@ -233,9 +234,12 @@ public class DataFrameAnalyticsManager {
                 final ThreadContext threadContext = client.threadPool().getThreadContext();
                 final Supplier<ThreadContext.StoredContext> supplier = threadContext.newRestorableContext(false);
                 try (ThreadContext.StoredContext ignore = threadContext.stashWithOrigin(ML_ORIGIN)) {
+                    LOGGER.info("[{}] Started reindexing", config.getId());
                     Task reindexTask = client.executeLocally(ReindexAction.INSTANCE, reindexRequest,
                         new ContextPreservingActionListener<>(supplier, reindexCompletedListener));
                     task.setReindexingTaskId(reindexTask.getId());
+                    auditor.info(config.getId(),
+                        Messages.getMessage(Messages.DATA_FRAME_ANALYTICS_AUDIT_STARTED_REINDEXING, config.getDest().getIndex()));
                 }
             },
             reindexCompletedListener::onFailure

+ 6 - 0
x-pack/plugin/ml/src/main/java/org/elasticsearch/xpack/ml/dataframe/process/AnalyticsProcessManager.java

@@ -147,6 +147,9 @@ public class AnalyticsProcessManager {
     }
 
     private void processData(DataFrameAnalyticsTask task, ProcessContext processContext, BytesReference state) {
+        LOGGER.info("[{}] Started loading data", processContext.config.getId());
+        auditor.info(processContext.config.getId(), Messages.getMessage(Messages.DATA_FRAME_ANALYTICS_AUDIT_STARTED_LOADING_DATA));
+
         DataFrameAnalyticsConfig config = processContext.config;
         DataFrameDataExtractor dataExtractor = processContext.dataExtractor.get();
         AnalyticsProcess<AnalyticsResult> process = processContext.process.get();
@@ -159,6 +162,9 @@ public class AnalyticsProcessManager {
 
             restoreState(task, config, state, process);
 
+            LOGGER.info("[{}] Started analyzing", processContext.config.getId());
+            auditor.info(processContext.config.getId(), Messages.getMessage(Messages.DATA_FRAME_ANALYTICS_AUDIT_STARTED_ANALYZING));
+
             LOGGER.info("[{}] Waiting for result processor to complete", config.getId());
             resultProcessor.awaitForCompletion();
             processContext.setFailureReason(resultProcessor.getFailure());

+ 5 - 0
x-pack/plugin/ml/src/main/java/org/elasticsearch/xpack/ml/dataframe/process/AnalyticsResultProcessor.java

@@ -26,6 +26,7 @@ import org.elasticsearch.xpack.core.ml.dataframe.stats.MemoryUsage;
 import org.elasticsearch.xpack.core.ml.inference.TrainedModelConfig;
 import org.elasticsearch.xpack.core.ml.inference.TrainedModelDefinition;
 import org.elasticsearch.xpack.core.ml.inference.TrainedModelInput;
+import org.elasticsearch.xpack.core.ml.job.messages.Messages;
 import org.elasticsearch.xpack.core.ml.utils.ExceptionsHelper;
 import org.elasticsearch.xpack.core.ml.utils.ToXContentParams;
 import org.elasticsearch.xpack.core.security.user.XPackUser;
@@ -120,6 +121,10 @@ public class AnalyticsResultProcessor {
                 AnalyticsResult result = iterator.next();
                 processResult(result, resultsJoiner);
                 if (result.getRowResults() != null) {
+                    if (processedRows == 0) {
+                        LOGGER.info("[{}] Started writing results", analytics.getId());
+                        auditor.info(analytics.getId(), Messages.getMessage(Messages.DATA_FRAME_ANALYTICS_AUDIT_STARTED_WRITING_RESULTS));
+                    }
                     processedRows++;
                     updateResultsProgress(processedRows >= totalRows ? 100 : (int) (processedRows * 100.0 / totalRows));
                 }