Browse Source

[ML] Add 'model_prune_window' field to AD job config (#75741)

Add configuration for pruning dead split fields in anomaly detection
jobs via the `model_prune_window` field for both the job creation and
update APIs.

Relates to ml-cpp/#1962
Ed Savage 4 years ago
parent
commit
5651215be1

+ 26 - 4
client/rest-high-level/src/main/java/org/elasticsearch/client/ml/job/config/AnalysisConfig.java

@@ -51,6 +51,7 @@ public class AnalysisConfig implements ToXContentObject {
     public static final ParseField DETECTORS = new ParseField("detectors");
     public static final ParseField INFLUENCERS = new ParseField("influencers");
     public static final ParseField MULTIVARIATE_BY_FIELDS = new ParseField("multivariate_by_fields");
+    public static final ParseField MODEL_PRUNE_WINDOW = new ParseField("model_prune_window");
 
     @SuppressWarnings("unchecked")
     public static final ConstructingObjectParser<Builder, Void> PARSER = new ConstructingObjectParser<>(ANALYSIS_CONFIG.getPreferredName(),
@@ -75,6 +76,8 @@ public class AnalysisConfig implements ToXContentObject {
         PARSER.declareString(Builder::setSummaryCountFieldName, SUMMARY_COUNT_FIELD_NAME);
         PARSER.declareStringArray(Builder::setInfluencers, INFLUENCERS);
         PARSER.declareBoolean(Builder::setMultivariateByFields, MULTIVARIATE_BY_FIELDS);
+        PARSER.declareString((builder, val) ->
+            builder.setModelPruneWindow(TimeValue.parseTimeValue(val, MODEL_PRUNE_WINDOW.getPreferredName())), MODEL_PRUNE_WINDOW);
     }
 
     /**
@@ -90,11 +93,13 @@ public class AnalysisConfig implements ToXContentObject {
     private final List<Detector> detectors;
     private final List<String> influencers;
     private final Boolean multivariateByFields;
+    private final TimeValue modelPruneWindow;
 
     private AnalysisConfig(TimeValue bucketSpan, String categorizationFieldName, List<String> categorizationFilters,
                            CategorizationAnalyzerConfig categorizationAnalyzerConfig,
                            PerPartitionCategorizationConfig perPartitionCategorizationConfig, TimeValue latency,
-                           String summaryCountFieldName, List<Detector> detectors, List<String> influencers, Boolean multivariateByFields) {
+                           String summaryCountFieldName, List<Detector> detectors, List<String> influencers, Boolean multivariateByFields,
+                           TimeValue modelPruneWindow) {
         this.detectors = Collections.unmodifiableList(detectors);
         this.bucketSpan = bucketSpan;
         this.latency = latency;
@@ -105,6 +110,7 @@ public class AnalysisConfig implements ToXContentObject {
         this.summaryCountFieldName = summaryCountFieldName;
         this.influencers = Collections.unmodifiableList(influencers);
         this.multivariateByFields = multivariateByFields;
+        this.modelPruneWindow = modelPruneWindow;
     }
 
     /**
@@ -171,6 +177,10 @@ public class AnalysisConfig implements ToXContentObject {
         return multivariateByFields;
     }
 
+    public TimeValue getModelPruneWindow() {
+        return modelPruneWindow;
+    }
+
     private static void addIfNotNull(Set<String> fields, String field) {
         if (field != null) {
             fields.add(field);
@@ -243,6 +253,9 @@ public class AnalysisConfig implements ToXContentObject {
         if (multivariateByFields != null) {
             builder.field(MULTIVARIATE_BY_FIELDS.getPreferredName(), multivariateByFields);
         }
+        if (modelPruneWindow != null) {
+            builder.field(MODEL_PRUNE_WINDOW.getPreferredName(), modelPruneWindow.getStringRep());
+        }
         builder.endObject();
         return builder;
     }
@@ -267,14 +280,15 @@ public class AnalysisConfig implements ToXContentObject {
             Objects.equals(summaryCountFieldName, that.summaryCountFieldName) &&
             Objects.equals(detectors, that.detectors) &&
             Objects.equals(influencers, that.influencers) &&
-            Objects.equals(multivariateByFields, that.multivariateByFields);
+            Objects.equals(multivariateByFields, that.multivariateByFields) &&
+            Objects.equals(modelPruneWindow, that.modelPruneWindow);
     }
 
     @Override
     public int hashCode() {
         return Objects.hash(
             bucketSpan, categorizationFieldName, categorizationFilters, categorizationAnalyzerConfig, perPartitionCategorizationConfig,
-            latency, summaryCountFieldName, detectors, influencers, multivariateByFields);
+            latency, summaryCountFieldName, detectors, influencers, multivariateByFields, modelPruneWindow);
     }
 
     public static Builder builder(List<Detector> detectors) {
@@ -293,6 +307,7 @@ public class AnalysisConfig implements ToXContentObject {
         private String summaryCountFieldName;
         private List<String> influencers = new ArrayList<>();
         private Boolean multivariateByFields;
+        private TimeValue modelPruneWindow;
 
         public Builder(List<Detector> detectors) {
             setDetectors(detectors);
@@ -310,6 +325,7 @@ public class AnalysisConfig implements ToXContentObject {
             this.summaryCountFieldName = analysisConfig.summaryCountFieldName;
             this.influencers = new ArrayList<>(analysisConfig.influencers);
             this.multivariateByFields = analysisConfig.multivariateByFields;
+            this.modelPruneWindow = analysisConfig.modelPruneWindow;
         }
 
         public Builder setDetectors(List<Detector> detectors) {
@@ -376,10 +392,16 @@ public class AnalysisConfig implements ToXContentObject {
             return this;
         }
 
+        public Builder setModelPruneWindow(TimeValue modelPruneWindow) {
+            this.modelPruneWindow = modelPruneWindow;
+            return this;
+        }
+
         public AnalysisConfig build() {
 
             return new AnalysisConfig(bucketSpan, categorizationFieldName, categorizationFilters, categorizationAnalyzerConfig,
-                perPartitionCategorizationConfig, latency, summaryCountFieldName, detectors, influencers, multivariateByFields);
+                perPartitionCategorizationConfig, latency, summaryCountFieldName, detectors, influencers, multivariateByFields,
+                modelPruneWindow);
         }
     }
 }

+ 23 - 4
client/rest-high-level/src/main/java/org/elasticsearch/client/ml/job/config/JobUpdate.java

@@ -48,6 +48,8 @@ public class JobUpdate implements ToXContentObject {
                 AnalysisConfig.PER_PARTITION_CATEGORIZATION);
         PARSER.declareField(Builder::setCustomSettings, (p, c) -> p.map(), Job.CUSTOM_SETTINGS, ObjectParser.ValueType.OBJECT);
         PARSER.declareBoolean(Builder::setAllowLazyOpen, Job.ALLOW_LAZY_OPEN);
+        PARSER.declareString((builder, val) -> builder.setModelPruneWindow(
+            TimeValue.parseTimeValue(val, AnalysisConfig.MODEL_PRUNE_WINDOW.getPreferredName())), AnalysisConfig.MODEL_PRUNE_WINDOW);
     }
 
     private final String jobId;
@@ -66,6 +68,7 @@ public class JobUpdate implements ToXContentObject {
     private final PerPartitionCategorizationConfig perPartitionCategorizationConfig;
     private final Map<String, Object> customSettings;
     private final Boolean allowLazyOpen;
+    private final TimeValue modelPruneWindow;
 
     private JobUpdate(String jobId, @Nullable List<String> groups, @Nullable String description,
                       @Nullable List<DetectorUpdate> detectorUpdates, @Nullable ModelPlotConfig modelPlotConfig,
@@ -74,7 +77,7 @@ public class JobUpdate implements ToXContentObject {
                       @Nullable Long systemAnnotationsRetentionDays, @Nullable Long modelSnapshotRetentionDays,
                       @Nullable Long dailyModelSnapshotRetentionAfterDays, @Nullable List<String> categorizationFilters,
                       @Nullable PerPartitionCategorizationConfig perPartitionCategorizationConfig,
-                      @Nullable Map<String, Object> customSettings, @Nullable Boolean allowLazyOpen) {
+                      @Nullable Map<String, Object> customSettings, @Nullable Boolean allowLazyOpen, @Nullable TimeValue modelPruneWindow) {
         this.jobId = jobId;
         this.groups = groups;
         this.description = description;
@@ -91,6 +94,7 @@ public class JobUpdate implements ToXContentObject {
         this.perPartitionCategorizationConfig = perPartitionCategorizationConfig;
         this.customSettings = customSettings;
         this.allowLazyOpen = allowLazyOpen;
+        this.modelPruneWindow = modelPruneWindow;
     }
 
     public String getJobId() {
@@ -153,6 +157,10 @@ public class JobUpdate implements ToXContentObject {
         return allowLazyOpen;
     }
 
+    public TimeValue getModelPruneWindow() {
+        return modelPruneWindow;
+    }
+
     @Override
     public XContentBuilder toXContent(XContentBuilder builder, Params params) throws IOException {
         builder.startObject();
@@ -202,6 +210,9 @@ public class JobUpdate implements ToXContentObject {
         if (allowLazyOpen != null) {
             builder.field(Job.ALLOW_LAZY_OPEN.getPreferredName(), allowLazyOpen);
         }
+        if (modelPruneWindow != null) {
+            builder.field(AnalysisConfig.MODEL_PRUNE_WINDOW.getPreferredName(), modelPruneWindow);
+        }
         builder.endObject();
         return builder;
     }
@@ -233,14 +244,16 @@ public class JobUpdate implements ToXContentObject {
             && Objects.equals(this.categorizationFilters, that.categorizationFilters)
             && Objects.equals(this.perPartitionCategorizationConfig, that.perPartitionCategorizationConfig)
             && Objects.equals(this.customSettings, that.customSettings)
-            && Objects.equals(this.allowLazyOpen, that.allowLazyOpen);
+            && Objects.equals(this.allowLazyOpen, that.allowLazyOpen)
+            && Objects.equals(this.modelPruneWindow, that.modelPruneWindow);
     }
 
     @Override
     public int hashCode() {
         return Objects.hash(jobId, groups, description, detectorUpdates, modelPlotConfig, analysisLimits, renormalizationWindowDays,
             backgroundPersistInterval, modelSnapshotRetentionDays, dailyModelSnapshotRetentionAfterDays, resultsRetentionDays,
-            systemAnnotationsRetentionDays, categorizationFilters, perPartitionCategorizationConfig, customSettings, allowLazyOpen);
+            systemAnnotationsRetentionDays, categorizationFilters, perPartitionCategorizationConfig, customSettings, allowLazyOpen,
+            modelPruneWindow);
     }
 
     public static class DetectorUpdate implements ToXContentObject {
@@ -340,6 +353,7 @@ public class JobUpdate implements ToXContentObject {
         private PerPartitionCategorizationConfig perPartitionCategorizationConfig;
         private Map<String, Object> customSettings;
         private Boolean allowLazyOpen;
+        private TimeValue modelPruneWindow;
 
         /**
          * New {@link JobUpdate.Builder} object for the existing job
@@ -525,11 +539,16 @@ public class JobUpdate implements ToXContentObject {
             return this;
         }
 
+        public Builder setModelPruneWindow(TimeValue modelPruneWindow) {
+            this.modelPruneWindow = modelPruneWindow;
+            return this;
+        }
+
         public JobUpdate build() {
             return new JobUpdate(jobId, groups, description, detectorUpdates, modelPlotConfig, analysisLimits, backgroundPersistInterval,
                 renormalizationWindowDays, resultsRetentionDays, systemAnnotationsRetentionDays, modelSnapshotRetentionDays,
                 dailyModelSnapshotRetentionAfterDays, categorizationFilters, perPartitionCategorizationConfig, customSettings,
-                allowLazyOpen);
+                allowLazyOpen, modelPruneWindow);
         }
     }
 }

+ 16 - 0
client/rest-high-level/src/test/java/org/elasticsearch/client/ml/job/config/AnalysisConfigTests.java

@@ -86,6 +86,9 @@ public class AnalysisConfigTests extends AbstractXContentTestCase<AnalysisConfig
         if (randomBoolean()) {
             builder.setMultivariateByFields(randomBoolean());
         }
+        if (randomBoolean()) {
+            builder.setModelPruneWindow(TimeValue.timeValueSeconds(randomIntBetween(1, 1_000_000)));
+        }
 
         builder.setInfluencers(Arrays.asList(generateRandomStringArray(10, 10, false)));
         return builder;
@@ -195,6 +198,19 @@ public class AnalysisConfigTests extends AbstractXContentTestCase<AnalysisConfig
         assertFalse(config2.equals(config1));
     }
 
+    public void testEquals_GivenDifferentModelPruneWindow() {
+        AnalysisConfig.Builder builder = createConfigBuilder();
+        builder.setModelPruneWindow(TimeValue.timeValueDays(30));
+        AnalysisConfig config1 = builder.build();
+
+        builder = createConfigBuilder();
+        builder.setModelPruneWindow(TimeValue.timeValueDays(60));
+        AnalysisConfig config2 = builder.build();
+
+        assertFalse(config1.equals(config2));
+        assertFalse(config2.equals(config1));
+    }
+
     public void testEquals_GivenSummaryCountField() {
         AnalysisConfig.Builder builder = createConfigBuilder();
         builder.setSummaryCountFieldName("foo");

+ 4 - 1
client/rest-high-level/src/test/java/org/elasticsearch/client/ml/job/config/JobUpdateTests.java

@@ -26,7 +26,7 @@ public class JobUpdateTests extends AbstractXContentTestCase<JobUpdate> {
 
     /**
      * Creates a completely random update when the job is null
-     * or a random update that is is valid for the given job
+     * or a random update that is valid for the given job
      */
     public static JobUpdate createRandom(String jobId) {
         JobUpdate.Builder update = new JobUpdate.Builder(jobId);
@@ -77,6 +77,9 @@ public class JobUpdateTests extends AbstractXContentTestCase<JobUpdate> {
         if (randomBoolean()) {
             update.setAllowLazyOpen(randomBoolean());
         }
+        if (randomBoolean()) {
+            update.setModelPruneWindow(TimeValue.timeValueDays(randomIntBetween(1, 100)));
+        }
 
         return update.build();
     }

+ 4 - 0
docs/reference/ml/anomaly-detection/apis/put-job.asciidoc

@@ -181,6 +181,10 @@ include::{es-repo-dir}/ml/ml-shared.asciidoc[tag=influencers]
 (time units)
 include::{es-repo-dir}/ml/ml-shared.asciidoc[tag=latency]
 
+`model_prune_window`:::
+(Optional, <<time-units,time units>>)
+include::{es-repo-dir}/ml/ml-shared.asciidoc[tag=model-prune-window]
+
 `multivariate_by_fields`:::
 (Boolean)
 include::{es-repo-dir}/ml/ml-shared.asciidoc[tag=multivariate-by-fields]

+ 4 - 0
docs/reference/ml/anomaly-detection/apis/update-job.asciidoc

@@ -196,6 +196,10 @@ include::{es-repo-dir}/ml/ml-shared.asciidoc[tag=model-plot-config-terms]
 ====
 //End model_plot_config
 
+`model_prune_window`::
+(<<time-units,time units>>)
+include::{es-repo-dir}/ml/ml-shared.asciidoc[tag=model-prune-window]
+
 `model_snapshot_retention_days`::
 (long)
 include::{es-repo-dir}/ml/ml-shared.asciidoc[tag=model-snapshot-retention-days]

+ 6 - 0
docs/reference/ml/ml-shared.asciidoc

@@ -1229,6 +1229,12 @@ applied. For example, "CPU,NetworkIn,DiskWrites". Wildcards are not supported.
 Only the specified `terms` can be viewed when using the Single Metric Viewer.
 end::model-plot-config-terms[]
 
+tag::model-prune-window[]
+Advanced configuration option, which affects the pruning of models that have not
+been updated for the given time duration. The value of this option must be at least
+two whole multiples of `bucket_span`. If not set, a default value is not supplied.
+end::model-prune-window[]
+
 tag::model-snapshot-id[]
 A numerical character string that uniquely identifies the model snapshot. For
 example, `1575402236000 `.

+ 71 - 10
x-pack/plugin/core/src/main/java/org/elasticsearch/xpack/core/ml/job/config/AnalysisConfig.java

@@ -6,19 +6,20 @@
  */
 package org.elasticsearch.xpack.core.ml.job.config;
 
-import org.elasticsearch.common.xcontent.ParseField;
+import org.elasticsearch.Version;
 import org.elasticsearch.common.Strings;
 import org.elasticsearch.common.io.stream.StreamInput;
 import org.elasticsearch.common.io.stream.StreamOutput;
 import org.elasticsearch.common.io.stream.Writeable;
-import org.elasticsearch.core.TimeValue;
 import org.elasticsearch.common.xcontent.ConstructingObjectParser;
 import org.elasticsearch.common.xcontent.ObjectParser;
+import org.elasticsearch.common.xcontent.ParseField;
 import org.elasticsearch.common.xcontent.ToXContentObject;
 import org.elasticsearch.common.xcontent.XContentBuilder;
+import org.elasticsearch.core.TimeValue;
+import org.elasticsearch.xpack.core.common.time.TimeUtils;
 import org.elasticsearch.xpack.core.ml.job.messages.Messages;
 import org.elasticsearch.xpack.core.ml.utils.ExceptionsHelper;
-import org.elasticsearch.xpack.core.common.time.TimeUtils;
 
 import java.io.IOException;
 import java.util.ArrayList;
@@ -42,7 +43,7 @@ import java.util.stream.Collectors;
  * <p>
  * The configuration can contain multiple detectors, a new anomaly detector will
  * be created for each detector configuration. The fields
- * <code>bucketSpan, summaryCountFieldName and categorizationFieldName</code>
+ * <code>bucketSpan, modelPruneWindow, summaryCountFieldName and categorizationFieldName</code>
  * apply to all detectors.
  * <p>
  * If a value has not been set it will be <code>null</code>
@@ -55,6 +56,7 @@ public class AnalysisConfig implements ToXContentObject, Writeable {
      */
     public static final ParseField ANALYSIS_CONFIG = new ParseField("analysis_config");
     public static final ParseField BUCKET_SPAN = new ParseField("bucket_span");
+    public static final ParseField MODEL_PRUNE_WINDOW = new ParseField("model_prune_window");
     public static final ParseField CATEGORIZATION_FIELD_NAME = new ParseField("categorization_field_name");
     public static final ParseField CATEGORIZATION_FILTERS = new ParseField("categorization_filters");
     public static final ParseField CATEGORIZATION_ANALYZER = CategorizationAnalyzerConfig.CATEGORIZATION_ANALYZER;
@@ -72,6 +74,9 @@ public class AnalysisConfig implements ToXContentObject, Writeable {
     public static final ConstructingObjectParser<AnalysisConfig.Builder, Void> LENIENT_PARSER = createParser(true);
     public static final ConstructingObjectParser<AnalysisConfig.Builder, Void> STRICT_PARSER = createParser(false);
 
+    // The minimum number of buckets considered acceptable for the model_prune_window field
+    public static final long MINIMUM_MODEL_PRUNE_WINDOW_BUCKETS = 2;
+
     @SuppressWarnings("unchecked")
     private static ConstructingObjectParser<AnalysisConfig.Builder, Void> createParser(boolean ignoreUnknownFields) {
         ConstructingObjectParser<AnalysisConfig.Builder, Void> parser = new ConstructingObjectParser<>(ANALYSIS_CONFIG.getPreferredName(),
@@ -96,6 +101,8 @@ public class AnalysisConfig implements ToXContentObject, Writeable {
         parser.declareString(Builder::setSummaryCountFieldName, SUMMARY_COUNT_FIELD_NAME);
         parser.declareStringArray(Builder::setInfluencers, INFLUENCERS);
         parser.declareBoolean(Builder::setMultivariateByFields, MULTIVARIATE_BY_FIELDS);
+        parser.declareString((builder, val) ->
+            builder.setModelPruneWindow(TimeValue.parseTimeValue(val, MODEL_PRUNE_WINDOW.getPreferredName())), MODEL_PRUNE_WINDOW);
 
         return parser;
     }
@@ -113,11 +120,14 @@ public class AnalysisConfig implements ToXContentObject, Writeable {
     private final List<Detector> detectors;
     private final List<String> influencers;
     private final Boolean multivariateByFields;
+    private final TimeValue modelPruneWindow;
+
 
     private AnalysisConfig(TimeValue bucketSpan, String categorizationFieldName, List<String> categorizationFilters,
                            CategorizationAnalyzerConfig categorizationAnalyzerConfig,
                            PerPartitionCategorizationConfig perPartitionCategorizationConfig, TimeValue latency,
-                           String summaryCountFieldName, List<Detector> detectors, List<String> influencers, Boolean multivariateByFields) {
+                           String summaryCountFieldName, List<Detector> detectors, List<String> influencers, Boolean multivariateByFields,
+                           TimeValue modelPruneWindow) {
         this.detectors = detectors;
         this.bucketSpan = bucketSpan;
         this.latency = latency;
@@ -128,6 +138,7 @@ public class AnalysisConfig implements ToXContentObject, Writeable {
         this.summaryCountFieldName = summaryCountFieldName;
         this.influencers = Collections.unmodifiableList(influencers);
         this.multivariateByFields = multivariateByFields;
+        this.modelPruneWindow = modelPruneWindow;
     }
 
     public AnalysisConfig(StreamInput in) throws IOException {
@@ -142,6 +153,11 @@ public class AnalysisConfig implements ToXContentObject, Writeable {
         influencers = Collections.unmodifiableList(in.readStringList());
 
         multivariateByFields = in.readOptionalBoolean();
+        if (in.getVersion().onOrAfter(Version.V_8_0_0)) {
+            modelPruneWindow = in.readOptionalTimeValue();
+        } else {
+            modelPruneWindow = null;
+        }
     }
 
     @Override
@@ -162,6 +178,10 @@ public class AnalysisConfig implements ToXContentObject, Writeable {
         out.writeStringCollection(influencers);
 
         out.writeOptionalBoolean(multivariateByFields);
+
+        if (out.getVersion().onOrAfter(Version.V_8_0_0)) {
+            out.writeOptionalTimeValue(modelPruneWindow);
+        }
     }
 
     /**
@@ -261,6 +281,10 @@ public class AnalysisConfig implements ToXContentObject, Writeable {
         return multivariateByFields;
     }
 
+    public TimeValue getModelPruneWindow() {
+        return modelPruneWindow;
+    }
+
     /**
      * Return the set of fields required by the analysis.
      * These are the influencer fields, metric field, partition field,
@@ -360,6 +384,10 @@ public class AnalysisConfig implements ToXContentObject, Writeable {
         if (multivariateByFields != null) {
             builder.field(MULTIVARIATE_BY_FIELDS.getPreferredName(), multivariateByFields);
         }
+        if (modelPruneWindow != null) {
+            builder.field(MODEL_PRUNE_WINDOW.getPreferredName(), modelPruneWindow.getStringRep());
+        }
+
         builder.endObject();
         return builder;
     }
@@ -378,14 +406,16 @@ public class AnalysisConfig implements ToXContentObject, Writeable {
                 Objects.equals(summaryCountFieldName, that.summaryCountFieldName) &&
                 Objects.equals(detectors, that.detectors) &&
                 Objects.equals(influencers, that.influencers) &&
-                Objects.equals(multivariateByFields, that.multivariateByFields);
+                Objects.equals(multivariateByFields, that.multivariateByFields) &&
+                Objects.equals(modelPruneWindow, that.modelPruneWindow);
     }
 
     @Override
     public int hashCode() {
         return Objects.hash(
-                bucketSpan, categorizationFieldName, categorizationFilters, categorizationAnalyzerConfig, perPartitionCategorizationConfig,
-                latency, summaryCountFieldName, detectors, influencers, multivariateByFields);
+                bucketSpan, categorizationFieldName, categorizationFilters, categorizationAnalyzerConfig,
+                perPartitionCategorizationConfig, latency, summaryCountFieldName, detectors, influencers, multivariateByFields,
+                modelPruneWindow);
     }
 
     public static class Builder {
@@ -402,6 +432,7 @@ public class AnalysisConfig implements ToXContentObject, Writeable {
         private String summaryCountFieldName;
         private List<String> influencers = new ArrayList<>();
         private Boolean multivariateByFields;
+        private TimeValue modelPruneWindow;
 
         public Builder(List<Detector> detectors) {
             setDetectors(detectors);
@@ -419,6 +450,7 @@ public class AnalysisConfig implements ToXContentObject, Writeable {
             this.summaryCountFieldName = analysisConfig.summaryCountFieldName;
             this.influencers = new ArrayList<>(analysisConfig.influencers);
             this.multivariateByFields = analysisConfig.multivariateByFields;
+            this.modelPruneWindow = analysisConfig.modelPruneWindow;
         }
 
         public Builder setDetectors(List<Detector> detectors) {
@@ -489,10 +521,15 @@ public class AnalysisConfig implements ToXContentObject, Writeable {
             return this;
         }
 
+        public Builder setModelPruneWindow(TimeValue modelPruneWindow) {
+            this.modelPruneWindow = modelPruneWindow;
+            return this;
+        }
+
         /**
          * Checks the configuration is valid
          * <ol>
-         * <li>Check that if non-null BucketSpan and Latency are &gt;= 0</li>
+         * <li>Check that if non-null BucketSpan, ModelPruneWindow and Latency are &gt;= 0</li>
          * <li>Check that if non-null Latency is &lt;= MAX_LATENCY</li>
          * <li>Check there is at least one detector configured</li>
          * <li>Check all the detectors are configured correctly</li>
@@ -503,6 +540,9 @@ public class AnalysisConfig implements ToXContentObject, Writeable {
          */
         public AnalysisConfig build() {
             TimeUtils.checkPositiveMultiple(bucketSpan, TimeUnit.SECONDS, BUCKET_SPAN);
+
+            verifyModelPruneWindow();
+
             if (latency != null) {
                 TimeUtils.checkNonNegativeMultiple(latency, TimeUnit.SECONDS, LATENCY);
             }
@@ -521,7 +561,28 @@ public class AnalysisConfig implements ToXContentObject, Writeable {
             verifyNoInconsistentNestedFieldNames();
 
             return new AnalysisConfig(bucketSpan, categorizationFieldName, categorizationFilters, categorizationAnalyzerConfig,
-                perPartitionCategorizationConfig, latency, summaryCountFieldName, detectors, influencers, multivariateByFields);
+                perPartitionCategorizationConfig, latency, summaryCountFieldName, detectors, influencers, multivariateByFields,
+                modelPruneWindow);
+        }
+
+        private void verifyModelPruneWindow() {
+            if (modelPruneWindow == null) {
+                return;
+            }
+
+            long modelPruneWindowSecs = modelPruneWindow.seconds();
+            long bucketSpanSecs = bucketSpan.seconds();
+
+            if (modelPruneWindowSecs % bucketSpanSecs != 0) {
+                throw ExceptionsHelper.badRequestException(MODEL_PRUNE_WINDOW.getPreferredName() + " [" + modelPruneWindow.toString() + "]"
+                    + " must be a multiple of " + BUCKET_SPAN.getPreferredName() + " [" + bucketSpan.toString() + "]");
+            }
+
+            if (modelPruneWindowSecs / bucketSpanSecs < MINIMUM_MODEL_PRUNE_WINDOW_BUCKETS) {
+                throw ExceptionsHelper.badRequestException(MODEL_PRUNE_WINDOW.getPreferredName() + " [" + modelPruneWindow.toString() + "]"
+                    + " must be at least " + MINIMUM_MODEL_PRUNE_WINDOW_BUCKETS + " times greater than " + BUCKET_SPAN.getPreferredName()
+                    + " [" + bucketSpan.toString() + "]");
+            }
         }
 
         private void verifyConfigConsistentWithPerPartitionCategorization() {

+ 40 - 5
x-pack/plugin/core/src/main/java/org/elasticsearch/xpack/core/ml/job/config/JobUpdate.java

@@ -61,6 +61,8 @@ public class JobUpdate implements Writeable, ToXContentObject {
                     AnalysisConfig.PER_PARTITION_CATEGORIZATION);
             parser.declareField(Builder::setCustomSettings, (p, c) -> p.map(), Job.CUSTOM_SETTINGS, ObjectParser.ValueType.OBJECT);
             parser.declareBoolean(Builder::setAllowLazyOpen, Job.ALLOW_LAZY_OPEN);
+            parser.declareString((builder, val) -> builder.setModelPruneWindow(
+                TimeValue.parseTimeValue(val, AnalysisConfig.MODEL_PRUNE_WINDOW.getPreferredName())), AnalysisConfig.MODEL_PRUNE_WINDOW);
         }
         // These fields should not be set by a REST request
         INTERNAL_PARSER.declareString(Builder::setModelSnapshotId, Job.MODEL_SNAPSHOT_ID);
@@ -91,6 +93,7 @@ public class JobUpdate implements Writeable, ToXContentObject {
     private final Boolean clearJobFinishTime;
     private final Boolean allowLazyOpen;
     private final Blocked blocked;
+    private final TimeValue modelPruneWindow;
 
     private JobUpdate(String jobId, @Nullable List<String> groups, @Nullable String description,
                       @Nullable List<DetectorUpdate> detectorUpdates, @Nullable ModelPlotConfig modelPlotConfig,
@@ -101,7 +104,7 @@ public class JobUpdate implements Writeable, ToXContentObject {
                       @Nullable PerPartitionCategorizationConfig perPartitionCategorizationConfig,
                       @Nullable Map<String, Object> customSettings, @Nullable String modelSnapshotId,
                       @Nullable Version modelSnapshotMinVersion, @Nullable Version jobVersion, @Nullable Boolean clearJobFinishTime,
-                      @Nullable Boolean allowLazyOpen, @Nullable Blocked blocked) {
+                      @Nullable Boolean allowLazyOpen, @Nullable Blocked blocked, @Nullable TimeValue modelPruneWindow) {
         this.jobId = jobId;
         this.groups = groups;
         this.description = description;
@@ -123,6 +126,7 @@ public class JobUpdate implements Writeable, ToXContentObject {
         this.clearJobFinishTime = clearJobFinishTime;
         this.allowLazyOpen = allowLazyOpen;
         this.blocked = blocked;
+        this.modelPruneWindow = modelPruneWindow;
     }
 
     public JobUpdate(StreamInput in) throws IOException {
@@ -164,6 +168,12 @@ public class JobUpdate implements Writeable, ToXContentObject {
         }
         allowLazyOpen = in.readOptionalBoolean();
         blocked = in.readOptionalWriteable(Blocked::new);
+
+        if (in.getVersion().onOrAfter(Version.V_8_0_0)) {
+            modelPruneWindow = in.readOptionalTimeValue();
+        } else {
+            modelPruneWindow = null;
+        }
     }
 
     @Override
@@ -206,6 +216,10 @@ public class JobUpdate implements Writeable, ToXContentObject {
         }
         out.writeOptionalBoolean(allowLazyOpen);
         out.writeOptionalWriteable(blocked);
+
+        if (out.getVersion().onOrAfter(Version.V_8_0_0)) {
+            out.writeOptionalTimeValue(modelPruneWindow);
+        }
     }
 
     public String getJobId() {
@@ -296,6 +310,10 @@ public class JobUpdate implements Writeable, ToXContentObject {
         return blocked;
     }
 
+    public TimeValue getModelPruneWindow() {
+        return modelPruneWindow;
+    }
+
     @Override
     public XContentBuilder toXContent(XContentBuilder builder, Params params) throws IOException {
         builder.startObject();
@@ -360,6 +378,9 @@ public class JobUpdate implements Writeable, ToXContentObject {
         if (blocked != null) {
             builder.field(Job.BLOCKED.getPreferredName(), blocked);
         }
+        if (modelPruneWindow != null) {
+            builder.field(AnalysisConfig.MODEL_PRUNE_WINDOW.getPreferredName(), modelPruneWindow);
+        }
         builder.endObject();
         return builder;
     }
@@ -420,6 +441,9 @@ public class JobUpdate implements Writeable, ToXContentObject {
         if (allowLazyOpen != null) {
             updateFields.add(Job.ALLOW_LAZY_OPEN.getPreferredName());
         }
+        if (modelPruneWindow != null) {
+            updateFields.add(AnalysisConfig.MODEL_PRUNE_WINDOW.getPreferredName());
+        }
         return updateFields;
     }
 
@@ -518,6 +542,9 @@ public class JobUpdate implements Writeable, ToXContentObject {
         if (blocked != null) {
             builder.setBlocked(blocked);
         }
+        if (modelPruneWindow != null) {
+            newAnalysisConfig.setModelPruneWindow(modelPruneWindow);
+        }
 
         builder.setAnalysisConfig(newAnalysisConfig);
         return builder.build();
@@ -547,7 +574,8 @@ public class JobUpdate implements Writeable, ToXContentObject {
                 && (jobVersion == null || Objects.equals(jobVersion, job.getJobVersion()))
                 && (clearJobFinishTime == null || clearJobFinishTime == false || job.getFinishedTime() == null)
                 && (allowLazyOpen == null || Objects.equals(allowLazyOpen, job.allowLazyOpen()))
-                && (blocked == null || Objects.equals(blocked, job.getBlocked()));
+                && (blocked == null || Objects.equals(blocked, job.getBlocked()))
+                && (modelPruneWindow == null || Objects.equals(modelPruneWindow, job.getAnalysisConfig().getModelPruneWindow()));
     }
 
     boolean updatesDetectors(Job job) {
@@ -600,7 +628,8 @@ public class JobUpdate implements Writeable, ToXContentObject {
                 && Objects.equals(this.jobVersion, that.jobVersion)
                 && Objects.equals(this.clearJobFinishTime, that.clearJobFinishTime)
                 && Objects.equals(this.allowLazyOpen, that.allowLazyOpen)
-                && Objects.equals(this.blocked, that.blocked);
+                && Objects.equals(this.blocked, that.blocked)
+                && Objects.equals(this.modelPruneWindow, that.modelPruneWindow);
     }
 
     @Override
@@ -608,7 +637,7 @@ public class JobUpdate implements Writeable, ToXContentObject {
         return Objects.hash(jobId, groups, description, detectorUpdates, modelPlotConfig, analysisLimits, renormalizationWindowDays,
                 backgroundPersistInterval, modelSnapshotRetentionDays, dailyModelSnapshotRetentionAfterDays, resultsRetentionDays,
                 systemAnnotationsRetentionDays, categorizationFilters, perPartitionCategorizationConfig, customSettings, modelSnapshotId,
-                modelSnapshotMinVersion, jobVersion, clearJobFinishTime, allowLazyOpen, blocked);
+                modelSnapshotMinVersion, jobVersion, clearJobFinishTime, allowLazyOpen, blocked, modelPruneWindow);
     }
 
     public static class DetectorUpdate implements Writeable, ToXContentObject {
@@ -725,6 +754,7 @@ public class JobUpdate implements Writeable, ToXContentObject {
         private Boolean clearJobFinishTime;
         private Boolean allowLazyOpen;
         private Blocked blocked;
+        private TimeValue modelPruneWindow;
 
         public Builder(String jobId) {
             this.jobId = jobId;
@@ -845,11 +875,16 @@ public class JobUpdate implements Writeable, ToXContentObject {
             return this;
         }
 
+        public Builder setModelPruneWindow(TimeValue modelPruneWindow) {
+            this.modelPruneWindow = modelPruneWindow;
+            return this;
+        }
+
         public JobUpdate build() {
             return new JobUpdate(jobId, groups, description, detectorUpdates, modelPlotConfig, analysisLimits, backgroundPersistInterval,
                     renormalizationWindowDays, resultsRetentionDays, systemAnnotationsRetentionDays, modelSnapshotRetentionDays,
                     dailyModelSnapshotRetentionAfterDays, categorizationFilters, perPartitionCategorizationConfig, customSettings,
-                    modelSnapshotId, modelSnapshotMinVersion, jobVersion, clearJobFinishTime, allowLazyOpen, blocked);
+                    modelSnapshotId, modelSnapshotMinVersion, jobVersion, clearJobFinishTime, allowLazyOpen, blocked, modelPruneWindow);
         }
     }
 }

+ 3 - 0
x-pack/plugin/core/src/main/resources/org/elasticsearch/xpack/core/ml/config_index_mappings.json

@@ -180,6 +180,9 @@
       },
       "analysis_config" : {
         "properties" : {
+          "model_prune_window" : {
+            "type" : "keyword"
+          },
           "bucket_span" : {
             "type" : "keyword"
           },

+ 27 - 1
x-pack/plugin/core/src/test/java/org/elasticsearch/xpack/core/ml/job/config/AnalysisConfigTests.java

@@ -53,6 +53,11 @@ public class AnalysisConfigTests extends AbstractSerializingTestCase<AnalysisCon
         if (randomBoolean()) {
             TimeValue bucketSpan = TimeValue.timeValueSeconds(randomIntBetween(1, 1_000));
             builder.setBucketSpan(bucketSpan);
+
+            // There is a dependency between model_prune_window and bucket_span: model_prune window must be
+            // at least twice the size of bucket_span.
+            builder.setModelPruneWindow(TimeValue.timeValueSeconds(randomIntBetween(2, 1_000) * bucketSpan.seconds()));
+
         }
         if (isCategorization) {
             builder.setCategorizationFieldName(randomAlphaOfLength(10));
@@ -419,6 +424,19 @@ public class AnalysisConfigTests extends AbstractSerializingTestCase<AnalysisCon
         assertFalse(config2.equals(config1));
     }
 
+    public void testEquals_GivenDifferentModelPruneWindow() {
+        AnalysisConfig.Builder builder = createConfigBuilder();
+        builder.setModelPruneWindow(TimeValue.timeValueDays(14));
+        AnalysisConfig config1 = builder.build();
+
+        builder = createConfigBuilder();
+        builder.setModelPruneWindow(TimeValue.timeValueDays(28));
+        AnalysisConfig config2 = builder.build();
+
+        assertFalse(config1.equals(config2));
+        assertFalse(config2.equals(config1));
+    }
+
     public void testEquals_GivenSummaryCountField() {
         AnalysisConfig.Builder builder = createConfigBuilder();
         builder.setSummaryCountFieldName("foo");
@@ -483,6 +501,7 @@ public class AnalysisConfigTests extends AbstractSerializingTestCase<AnalysisCon
                 CategorizationAnalyzerConfig.buildDefaultCategorizationAnalyzer(Collections.singletonList("foo")));
         builder.setInfluencers(Collections.singletonList("myInfluencer"));
         builder.setLatency(TimeValue.timeValueSeconds(3600));
+        builder.setModelPruneWindow(TimeValue.timeValueDays(30));
         builder.setSummaryCountFieldName("sumCount");
         return builder.build();
     }
@@ -701,6 +720,7 @@ public class AnalysisConfigTests extends AbstractSerializingTestCase<AnalysisCon
         AnalysisConfig.Builder analysisConfig = new AnalysisConfig.Builder(detectors);
         analysisConfig.setBucketSpan(TimeValue.timeValueHours(1));
         analysisConfig.setLatency(TimeValue.ZERO);
+        analysisConfig.setModelPruneWindow(TimeValue.timeValueHours(3));
         return analysisConfig;
     }
 
@@ -710,6 +730,7 @@ public class AnalysisConfigTests extends AbstractSerializingTestCase<AnalysisCon
         AnalysisConfig.Builder analysisConfig = new AnalysisConfig.Builder(Collections.singletonList(detector.build()));
         analysisConfig.setBucketSpan(TimeValue.timeValueHours(1));
         analysisConfig.setLatency(TimeValue.ZERO);
+        analysisConfig.setModelPruneWindow(TimeValue.timeValueHours(3));
         analysisConfig.setCategorizationFieldName("msg");
         return analysisConfig;
     }
@@ -727,7 +748,12 @@ public class AnalysisConfigTests extends AbstractSerializingTestCase<AnalysisCon
             builder.setDetectors(detectors);
             break;
         case 1:
-            builder.setBucketSpan(new TimeValue(instance.getBucketSpan().millis() + (between(1, 1000) * 1000)));
+            TimeValue bucketSpan = new TimeValue(instance.getBucketSpan().millis() + (between(1, 1000) * 1000));
+            builder.setBucketSpan(bucketSpan);
+
+            // There is a dependency between model_prune_window and bucket_span: model_prune window must be
+            // at least twice the size of bucket_span.
+            builder.setModelPruneWindow(new TimeValue(between(2, 1000) * bucketSpan.millis()));
             break;
         case 2:
             if (instance.getLatency() == null) {

+ 6 - 0
x-pack/plugin/core/src/test/java/org/elasticsearch/xpack/core/ml/job/config/JobUpdateTests.java

@@ -136,6 +136,10 @@ public class JobUpdateTests extends AbstractSerializingTestCase<JobUpdate> {
         if (useInternalParser && randomBoolean()) {
             update.setBlocked(BlockedTests.createRandom());
         }
+        if (randomBoolean() && job != null) {
+            update.setModelPruneWindow(TimeValue.timeValueSeconds(TimeValue.timeValueSeconds(randomIntBetween(2, 100)).seconds()
+                * job.getAnalysisConfig().getBucketSpan().seconds()));
+        }
 
         return update.build();
     }
@@ -266,6 +270,7 @@ public class JobUpdateTests extends AbstractSerializingTestCase<JobUpdate> {
         updateBuilder.setCustomSettings(customSettings);
         updateBuilder.setModelSnapshotId(randomAlphaOfLength(10));
         updateBuilder.setJobVersion(VersionUtils.randomCompatibleVersion(random(), Version.CURRENT));
+        updateBuilder.setModelPruneWindow(TimeValue.timeValueDays(randomIntBetween(1, 100)));
         JobUpdate update = updateBuilder.build();
 
         Job.Builder jobBuilder = new Job.Builder("foo");
@@ -300,6 +305,7 @@ public class JobUpdateTests extends AbstractSerializingTestCase<JobUpdate> {
         assertEquals(update.getCustomSettings(), updatedJob.getCustomSettings());
         assertEquals(update.getModelSnapshotId(), updatedJob.getModelSnapshotId());
         assertEquals(update.getJobVersion(), updatedJob.getJobVersion());
+        assertEquals(update.getModelPruneWindow(), updatedJob.getAnalysisConfig().getModelPruneWindow());
         for (JobUpdate.DetectorUpdate detectorUpdate : update.getDetectorUpdates()) {
             Detector updatedDetector = updatedJob.getAnalysisConfig().getDetectors().get(detectorUpdate.getDetectorIndex());
             assertNotNull(updatedDetector);

+ 94 - 1
x-pack/plugin/src/yamlRestTest/resources/rest-api-spec/test/ml/jobs_crud.yml

@@ -109,6 +109,56 @@
   - match: { job_id: "job-model-memory-limit-as-number" }
   - match: { analysis_limits.model_memory_limit: "2048mb" }
 
+---
+"Test put job with model_prune_window":
+  - do:
+      ml.put_job:
+        job_id: job-model-prune-window
+        body:  >
+          {
+            "analysis_config" : {
+                "bucket_span": "15m",
+                "detectors" :[{"function":"count"}],
+                "model_prune_window": "14d"
+            },
+            "data_description" : {
+            }
+          }
+  - match: { job_id: "job-model-prune-window" }
+  - match: { analysis_config.bucket_span: "15m" }
+  - match: { analysis_config.model_prune_window: "14d" }
+
+  - do:
+      catch: /model_prune_window \[29m\] must be a multiple of bucket_span \[15m\]/
+      ml.put_job:
+        job_id: job-invalid-model-prune-window
+        body:  >
+          {
+            "analysis_config" : {
+                "bucket_span": "15m",
+                "detectors" :[{"function":"count"}],
+                "model_prune_window": "29m"
+            },
+            "data_description" : {
+            }
+          }
+
+  - do:
+      catch: /model_prune_window \[15m\] must be at least 2 times greater than bucket_span \[15m\]/
+      ml.put_job:
+        job_id: job-model-prune-window
+        body:  >
+          {
+            "analysis_config" : {
+                "bucket_span": "15m",
+                "detectors" :[{"function":"count"}],
+                "model_prune_window": "15m"
+            },
+            "data_description" : {
+            }
+          }
+
+
 ---
 "Test put job with model_memory_limit as string and lazy open":
   - skip:
@@ -312,7 +362,8 @@
                 "detectors" :[{"function":"mean","field_name":"responsetime","by_field_name":"airline"},
                     {"function":"count","by_field_name":"mlcategory"}],
                 "categorization_field_name": "some_category",
-                "categorization_filters" : ["cat1.*", "cat2.*"]
+                "categorization_filters" : ["cat1.*", "cat2.*"],
+                "model_prune_window": "30d"
             },
             "data_description" : {
             },
@@ -341,6 +392,8 @@
   - match: { analysis_config.categorization_analyzer.char_filter.0: "first_non_blank_line" }
   - match: { analysis_config.categorization_analyzer.char_filter.1.pattern: "cat1.*" }
   - match: { analysis_config.categorization_analyzer.char_filter.2.pattern: "cat2.*" }
+  - match: { analysis_config.bucket_span: "5m" }
+  - match: { analysis_config.model_prune_window: "30d" }
 
   - do:
       ml.open_job:
@@ -373,6 +426,7 @@
                 "description": "updated description"
               }
             ],
+            "model_prune_window": "60d",
             "model_plot_config": {
               "enabled": false,
               "terms": "foobar"
@@ -402,6 +456,8 @@
   - match: { model_snapshot_retention_days: 30 }
   - match: { daily_model_snapshot_retention_after_days: 2 }
   - match: { results_retention_days: 40 }
+  - match: { analysis_config.bucket_span: "5m" }
+  - match: { analysis_config.model_prune_window: "60d" }
   - match: { system_annotations_retention_days: 50 }
 
   - do:
@@ -431,6 +487,43 @@
           }
   - match: { analysis_limits.model_memory_limit: "20mb" }
 
+  - do:
+      ml.update_job:
+        job_id: jobs-crud-update-job
+        body:  >
+          {
+              "model_prune_window": "12h"
+          }
+  - match: { analysis_config.model_prune_window: "12h" }
+
+  - do:
+      catch: /model_prune_window \[5m\] must be at least 2 times greater than bucket_span \[5m\]/
+      ml.update_job:
+        job_id: jobs-crud-update-job
+        body:  >
+          {
+              "model_prune_window": "5m"
+          }
+
+  - do:
+      catch: /model_prune_window \[2m\] must be a multiple of bucket_span \[5m\]/
+      ml.update_job:
+        job_id: jobs-crud-update-job
+        body:  >
+          {
+              "model_prune_window": "2m"
+          }
+
+  - do:
+      catch: /\[job_update\] failed to parse field \[model_prune_window\]/
+      ml.update_job:
+        job_id: jobs-crud-update-job
+        body:  >
+          {
+              "model_prune_window": "8w"
+          }
+  - match: { analysis_config.model_prune_window: null }
+
   - do:
       ml.update_job:
         job_id: jobs-crud-update-job