Browse Source

[ML] only persist progress if it has changed (#62123)

* [ML] only persist progress if it has changed

We already search for the previously stored progress document.

For optimization purposes, and to prevent restoring the same
progress after a failed analytics job is stopped,
this commit does an equality check between the previously stored progress and current progress
If the progress has changed, persistence continues as normal.
Benjamin Trent 5 years ago
parent
commit
c8707eadd8

+ 0 - 1
x-pack/plugin/ml/qa/native-multi-node-tests/src/javaRestTest/java/org/elasticsearch/xpack/ml/integration/ClassificationIT.java

@@ -779,7 +779,6 @@ public class ClassificationIT extends MlNativeDataFrameAnalyticsIntegTestCase {
         assertThat(getOnlyElement(getAnalytics(jobId)).getDescription(), is(equalTo("updated-description-2")));
     }
 
-    @AwaitsFix(bugUrl = "https://github.com/elastic/elasticsearch/issues/61913")
     public void testTooLowConfiguredMemoryStillStarts() throws Exception {
         initialize("low_memory_analysis");
         indexData(sourceIndex, 10_000, 0, NESTED_FIELD);

+ 0 - 1
x-pack/plugin/ml/qa/native-multi-node-tests/src/javaRestTest/java/org/elasticsearch/xpack/ml/integration/MlNativeDataFrameAnalyticsIntegTestCase.java

@@ -93,7 +93,6 @@ abstract class MlNativeDataFrameAnalyticsIntegTestCase extends MlNativeIntegTest
         for (DataFrameAnalyticsConfig config : analytics) {
             try {
                 assertThat(deleteAnalytics(config.getId()).isAcknowledged(), is(true));
-                assertThat(searchStoredProgress(config.getId()).getHits().getTotalHits().value, equalTo(0L));
             } catch (Exception e) {
                 // just log and ignore
                 logger.error(new ParameterizedMessage("[{}] Could not clean up analytics job config", config.getId()), e);

+ 17 - 2
x-pack/plugin/ml/src/main/java/org/elasticsearch/xpack/ml/dataframe/DataFrameAnalyticsTask.java

@@ -49,6 +49,7 @@ import org.elasticsearch.xpack.core.watcher.watch.Payload;
 import org.elasticsearch.xpack.ml.dataframe.stats.ProgressTracker;
 import org.elasticsearch.xpack.ml.dataframe.stats.StatsHolder;
 import org.elasticsearch.xpack.ml.notifications.DataFrameAnalyticsAuditor;
+import org.elasticsearch.xpack.ml.utils.persistence.MlParserUtils;
 
 import java.util.List;
 import java.util.Map;
@@ -309,17 +310,31 @@ public class DataFrameAnalyticsTask extends AllocatedPersistentTask implements S
         ActionListener<SearchResponse> searchFormerProgressDocListener = ActionListener.wrap(
             searchResponse -> {
                 String indexOrAlias = AnomalyDetectorsIndex.jobStateIndexWriteAlias();
+                StoredProgress previous = null;
                 if (searchResponse.getHits().getHits().length > 0) {
                     indexOrAlias = searchResponse.getHits().getHits()[0].getIndex();
+                    try {
+                        previous = MlParserUtils.parse(searchResponse.getHits().getHits()[0], StoredProgress.PARSER);
+                    } catch (Exception ex) {
+                        LOGGER.warn(new ParameterizedMessage("[{}] failed to parse previously stored progress", jobId), ex);
+                    }
                 }
+
+                List<PhaseProgress> progress = statsHolder.getProgressTracker().report();
+                final StoredProgress progressToStore = new StoredProgress(progress);
+                if (progressToStore.equals(previous)) {
+                    LOGGER.debug("[{}] new progress is the same as previously persisted progress. Skipping storage.", jobId);
+                    runnable.run();
+                    return;
+                }
+
                 IndexRequest indexRequest = new IndexRequest(indexOrAlias)
                     .id(progressDocId)
                     .setRequireAlias(AnomalyDetectorsIndex.jobStateIndexWriteAlias().equals(indexOrAlias))
                     .setRefreshPolicy(WriteRequest.RefreshPolicy.IMMEDIATE);
-                List<PhaseProgress> progress = statsHolder.getProgressTracker().report();
                 try (XContentBuilder jsonBuilder = JsonXContent.contentBuilder()) {
                     LOGGER.debug("[{}] Persisting progress is: {}", jobId, progress);
-                    new StoredProgress(progress).toXContent(jsonBuilder, Payload.XContent.EMPTY_PARAMS);
+                    progressToStore.toXContent(jsonBuilder, Payload.XContent.EMPTY_PARAMS);
                     indexRequest.source(jsonBuilder);
                 }
                 executeAsyncWithOrigin(client, ML_ORIGIN, IndexAction.INSTANCE, indexRequest, indexProgressDocListener);