Browse Source

[ML] Audit message when nightly maintenance times out (#63252)

During deletion of old ml data set the delete by query timeout to 8 hours and 
audit a job message when the nightly maintenance task times out.
David Kyle 5 years ago
parent
commit
6784e826a8

+ 13 - 5
docs/reference/settings/ml-settings.asciidoc

@@ -25,7 +25,7 @@ hardware, you must disable {ml} (by setting `xpack.ml.enabled` to `false`).
 the node as a _{ml} node_ that is capable of running jobs. Every node is a {ml}
 node by default.
 +
-If you use the `node.roles` setting, then all required roles must be explicitly  
+If you use the `node.roles` setting, then all required roles must be explicitly
 set. Consult <<modules-node>> to learn more.
 +
 IMPORTANT: On dedicated coordinating nodes or dedicated master nodes, do not set
@@ -44,9 +44,9 @@ from clients (including {kib}) also fail. For more information about disabling
 {kibana-ref}/ml-settings-kb.html[{kib} {ml} settings].
 +
 IMPORTANT: If you want to use {ml-features} in your cluster, it is recommended
-that you set `xpack.ml.enabled` to `true` on all nodes. This is the default 
-behavior. At a minimum, it must be enabled on all master-eligible nodes. If you 
-want to use {ml-features} in clients or {kib}, it must also be enabled on all 
+that you set `xpack.ml.enabled` to `true` on all nodes. This is the default
+behavior. At a minimum, it must be enabled on all master-eligible nodes. If you
+want to use {ml-features} in clients or {kib}, it must also be enabled on all
 coordinating nodes.
 
 `xpack.ml.inference_model.cache_size`::
@@ -90,7 +90,7 @@ For more information about the `model_memory_limit` property, see
 
 [[xpack.ml.max_open_jobs]]
 `xpack.ml.max_open_jobs`::
-(<<cluster-update-settings,Dynamic>>) The maximum number of jobs that can run 
+(<<cluster-update-settings,Dynamic>>) The maximum number of jobs that can run
 simultaneously on a node. Defaults to `20`. In this context, jobs include both
 {anomaly-jobs} and {dfanalytics-jobs}. The maximum number of jobs is also
 constrained by memory usage. Thus if the estimated memory usage of the jobs
@@ -100,6 +100,14 @@ dynamic setting in version 7.1. As a result, changes to its value after node
 startup are used only after every node in the cluster is running version 7.1 or
 higher. The maximum permitted value is `512`.
 
+`xpack.ml.nightly_maintenance_requests_per_second`::
+(<<cluster-update-settings,Dynamic>>) The rate at which the nightly maintenance task
+deletes expired model snapshots and results. The setting is a proxy to the
+[requests_per_second](https://www.elastic.co/guide/en/elasticsearch/reference/current/docs-delete-by-query.html#_throttling_delete_requests)
+parameter used in the Delete by query requests and controls throttling.
+Valid values must be greater than `0.0` or equal to `-1.0` where `-1.0` means a default value
+is used. Defaults to `-1.0`
+
 `xpack.ml.node_concurrent_job_allocations`::
 (<<cluster-update-settings,Dynamic>>) The maximum number of jobs that can
 concurrently be in the `opening` state on each node. Typically, jobs spend a

+ 3 - 0
x-pack/plugin/core/src/main/java/org/elasticsearch/xpack/core/common/notifications/AbstractAuditor.java

@@ -33,6 +33,9 @@ import static org.elasticsearch.common.xcontent.XContentFactory.jsonBuilder;
 
 public abstract class AbstractAuditor<T extends AbstractAuditMessage> {
 
+    // The special ID that means the message applies to all jobs/resources.
+    public static final String All_RESOURCES_ID = "";
+
     private static final Logger logger = LogManager.getLogger(AbstractAuditor.class);
     static final int MAX_BUFFER_SIZE = 1000;
 

+ 19 - 2
x-pack/plugin/core/src/main/java/org/elasticsearch/xpack/core/ml/action/DeleteExpiredDataAction.java

@@ -59,6 +59,7 @@ public class DeleteExpiredDataAction extends ActionType<DeleteExpiredDataAction.
         private Float requestsPerSecond;
         private TimeValue timeout;
         private String jobId;
+        private String [] expandedJobIds;
 
         public Request() {}
 
@@ -101,6 +102,20 @@ public class DeleteExpiredDataAction extends ActionType<DeleteExpiredDataAction.
             return this;
         }
 
+        /**
+         * Not serialized, the expanded job Ids should only be used
+         * on the executing node.
+         * @return The expanded Ids in the case where {@code jobId} is not `_all`
+         * otherwise null.
+         */
+        public String [] getExpandedJobIds() {
+            return expandedJobIds;
+        }
+
+        public void setExpandedJobIds(String [] expandedJobIds) {
+            this.expandedJobIds = expandedJobIds;
+        }
+
         @Override
         public ActionRequestValidationException validate() {
             if (this.requestsPerSecond != null && this.requestsPerSecond != -1.0f && this.requestsPerSecond <= 0) {
@@ -118,12 +133,13 @@ public class DeleteExpiredDataAction extends ActionType<DeleteExpiredDataAction.
             Request request = (Request) o;
             return Objects.equals(requestsPerSecond, request.requestsPerSecond)
                 && Objects.equals(jobId, request.jobId)
+                && Objects.equals(expandedJobIds, request.expandedJobIds)
                 && Objects.equals(timeout, request.timeout);
         }
 
         @Override
         public int hashCode() {
-            return Objects.hash(requestsPerSecond, timeout, jobId);
+            return Objects.hash(requestsPerSecond, timeout, jobId, expandedJobIds);
         }
 
         @Override
@@ -132,6 +148,7 @@ public class DeleteExpiredDataAction extends ActionType<DeleteExpiredDataAction.
             out.writeOptionalFloat(requestsPerSecond);
             out.writeOptionalTimeValue(timeout);
             out.writeOptionalString(jobId);
+            // expandedJobIds are set on the node and not part of serialisation
         }
     }
 
@@ -139,7 +156,7 @@ public class DeleteExpiredDataAction extends ActionType<DeleteExpiredDataAction.
 
         private static final ParseField DELETED = new ParseField("deleted");
 
-        private boolean deleted;
+        private final boolean deleted;
 
         public Response(boolean deleted) {
             this.deleted = deleted;

+ 38 - 10
x-pack/plugin/ml/src/main/java/org/elasticsearch/xpack/ml/action/TransportDeleteExpiredDataAction.java

@@ -16,12 +16,14 @@ import org.elasticsearch.client.OriginSettingClient;
 import org.elasticsearch.cluster.service.ClusterService;
 import org.elasticsearch.common.Strings;
 import org.elasticsearch.common.inject.Inject;
+import org.elasticsearch.common.unit.TimeValue;
 import org.elasticsearch.index.reindex.AbstractBulkByScrollRequest;
 import org.elasticsearch.tasks.Task;
 import org.elasticsearch.tasks.TaskId;
 import org.elasticsearch.threadpool.ThreadPool;
 import org.elasticsearch.transport.TransportService;
 import org.elasticsearch.xpack.core.ClientHelper;
+import org.elasticsearch.xpack.core.common.notifications.AbstractAuditor;
 import org.elasticsearch.xpack.core.ml.action.DeleteExpiredDataAction;
 import org.elasticsearch.xpack.core.ml.job.config.Job;
 import org.elasticsearch.xpack.ml.MachineLearning;
@@ -53,8 +55,6 @@ public class TransportDeleteExpiredDataAction extends HandledTransportAction<Del
 
     private static final Logger logger = LogManager.getLogger(TransportDeleteExpiredDataAction.class);
 
-    static final Duration DEFAULT_MAX_DURATION = Duration.ofHours(8);
-
     private final ThreadPool threadPool;
     private final String executor;
     private final OriginSettingClient client;
@@ -62,18 +62,21 @@ public class TransportDeleteExpiredDataAction extends HandledTransportAction<Del
     private final Clock clock;
     private final JobConfigProvider jobConfigProvider;
     private final JobResultsProvider jobResultsProvider;
+    private final AnomalyDetectionAuditor auditor;
 
     @Inject
     public TransportDeleteExpiredDataAction(ThreadPool threadPool, TransportService transportService,
                                             ActionFilters actionFilters, Client client, ClusterService clusterService,
-                                            JobConfigProvider jobConfigProvider, JobResultsProvider jobResultsProvider) {
+                                            JobConfigProvider jobConfigProvider, JobResultsProvider jobResultsProvider,
+                                            AnomalyDetectionAuditor auditor) {
         this(threadPool, MachineLearning.UTILITY_THREAD_POOL_NAME, transportService, actionFilters, client, clusterService,
-            jobConfigProvider, jobResultsProvider, Clock.systemUTC());
+            jobConfigProvider, jobResultsProvider, auditor, Clock.systemUTC());
     }
 
     TransportDeleteExpiredDataAction(ThreadPool threadPool, String executor, TransportService transportService,
                                      ActionFilters actionFilters, Client client, ClusterService clusterService,
-                                     JobConfigProvider jobConfigProvider, JobResultsProvider jobResultsProvider, Clock clock) {
+                                     JobConfigProvider jobConfigProvider, JobResultsProvider jobResultsProvider,
+                                     AnomalyDetectionAuditor auditor, Clock clock) {
         super(DeleteExpiredDataAction.NAME, transportService, actionFilters, DeleteExpiredDataAction.Request::new, executor);
         this.threadPool = threadPool;
         this.executor = executor;
@@ -82,6 +85,7 @@ public class TransportDeleteExpiredDataAction extends HandledTransportAction<Del
         this.clock = clock;
         this.jobConfigProvider = jobConfigProvider;
         this.jobResultsProvider = jobResultsProvider;
+        this.auditor = auditor;
     }
 
     @Override
@@ -89,7 +93,8 @@ public class TransportDeleteExpiredDataAction extends HandledTransportAction<Del
                              ActionListener<DeleteExpiredDataAction.Response> listener) {
         logger.info("Deleting expired data");
         Instant timeoutTime = Instant.now(clock).plus(
-            request.getTimeout() == null ? DEFAULT_MAX_DURATION : Duration.ofMillis(request.getTimeout().millis())
+            request.getTimeout() == null ? Duration.ofMillis(MlDataRemover.DEFAULT_MAX_DURATION.getMillis()) :
+                Duration.ofMillis(request.getTimeout().millis())
         );
 
         TaskId taskId = new TaskId(clusterService.localNode().getId(), task.getId());
@@ -97,7 +102,7 @@ public class TransportDeleteExpiredDataAction extends HandledTransportAction<Del
         Supplier<Boolean> isTimedOutSupplier = () -> Instant.now(clock).isAfter(timeoutTime);
         AnomalyDetectionAuditor auditor = new AnomalyDetectionAuditor(client, clusterService);
 
-        if (Strings.isNullOrEmpty(request.getJobId()) || Strings.isAllOrWildcard(new String[]{request.getJobId()})) {
+        if (Strings.isNullOrEmpty(request.getJobId()) || Strings.isAllOrWildcard(request.getJobId())) {
             List<MlDataRemover> dataRemovers = createDataRemovers(client, taskId, auditor);
             threadPool.executor(MachineLearning.UTILITY_THREAD_POOL_NAME).execute(
                 () -> deleteExpiredData(request, dataRemovers, listener, isTimedOutSupplier)
@@ -107,6 +112,8 @@ public class TransportDeleteExpiredDataAction extends HandledTransportAction<Del
                 jobBuilders -> {
                     threadPool.executor(MachineLearning.UTILITY_THREAD_POOL_NAME).execute(() -> {
                             List<Job> jobs = jobBuilders.stream().map(Job.Builder::build).collect(Collectors.toList());
+                            String [] jobIds = jobs.stream().map(Job::getId).toArray(String[]::new);
+                            request.setExpandedJobIds(jobIds);
                             List<MlDataRemover> dataRemovers = createDataRemovers(jobs, taskId, auditor);
                             deleteExpiredData(request, dataRemovers, listener, isTimedOutSupplier);
                         }
@@ -133,10 +140,11 @@ public class TransportDeleteExpiredDataAction extends HandledTransportAction<Del
                 (float) (AbstractBulkByScrollRequest.DEFAULT_SCROLL_SIZE / 5) * numberOfDatanodes :
                 Float.POSITIVE_INFINITY;
         }
-        deleteExpiredData(dataRemoversIterator, requestsPerSec, listener, isTimedOutSupplier, true);
+        deleteExpiredData(request, dataRemoversIterator, requestsPerSec, listener, isTimedOutSupplier, true);
     }
 
-    void deleteExpiredData(Iterator<MlDataRemover> mlDataRemoversIterator,
+    void deleteExpiredData(DeleteExpiredDataAction.Request request,
+                           Iterator<MlDataRemover> mlDataRemoversIterator,
                            float requestsPerSecond,
                            ActionListener<DeleteExpiredDataAction.Response> listener,
                            Supplier<Boolean> isTimedOutSupplier,
@@ -146,6 +154,7 @@ public class TransportDeleteExpiredDataAction extends HandledTransportAction<Del
             ActionListener<Boolean> nextListener = ActionListener.wrap(
                 booleanResponse ->
                     deleteExpiredData(
+                        request,
                         mlDataRemoversIterator,
                         requestsPerSecond,
                         listener,
@@ -164,7 +173,26 @@ public class TransportDeleteExpiredDataAction extends HandledTransportAction<Del
             if (haveAllPreviousDeletionsCompleted) {
                 logger.info("Completed deletion of expired ML data");
             } else {
-                logger.info("Halted deletion of expired ML data until next invocation");
+                if (isTimedOutSupplier.get()) {
+                    TimeValue timeoutPeriod = request.getTimeout() == null ? MlDataRemover.DEFAULT_MAX_DURATION :
+                        request.getTimeout();
+                    String msg = "Deleting expired ML data was cancelled after the timeout period of [" +
+                        timeoutPeriod  + "] was exceeded. The setting [xpack.ml.nightly_maintenance_requests_per_second] " +
+                        "controls the deletion rate, consider increasing the value to assist in pruning old data";
+                    logger.warn(msg);
+
+                    if (Strings.isNullOrEmpty(request.getJobId())
+                        || Strings.isAllOrWildcard(request.getJobId())
+                        || request.getExpandedJobIds() == null) {
+                        auditor.warning(AbstractAuditor.All_RESOURCES_ID, msg);
+                    } else {
+                        for (String jobId : request.getExpandedJobIds()) {
+                            auditor.warning(jobId, msg);
+                        }
+                    }
+                } else {
+                    logger.info("Halted deletion of expired ML data until next invocation");
+                }
             }
             listener.onResponse(new DeleteExpiredDataAction.Response(haveAllPreviousDeletionsCompleted));
         }

+ 1 - 0
x-pack/plugin/ml/src/main/java/org/elasticsearch/xpack/ml/job/retention/ExpiredForecastsRemover.java

@@ -174,6 +174,7 @@ public class ExpiredForecastsRemover implements MlDataRemover {
     private DeleteByQueryRequest buildDeleteByQuery(List<JobForecastId> ids) {
         DeleteByQueryRequest request = new DeleteByQueryRequest();
         request.setSlices(AbstractBulkByScrollRequest.AUTO_SLICES);
+        request.setTimeout(DEFAULT_MAX_DURATION);
 
         request.indices(RESULTS_INDEX_PATTERN);
         BoolQueryBuilder boolQuery = QueryBuilders.boolQuery().minimumShouldMatch(1);

+ 1 - 0
x-pack/plugin/ml/src/main/java/org/elasticsearch/xpack/ml/job/retention/ExpiredResultsRemover.java

@@ -123,6 +123,7 @@ public class ExpiredResultsRemover extends AbstractExpiredJobDataRemover {
         request.setBatchSize(AbstractBulkByScrollRequest.DEFAULT_SCROLL_SIZE)
             // We are deleting old data, we should simply proceed as a version conflict could mean that another deletion is taking place
             .setAbortOnVersionConflict(false)
+            .setTimeout(DEFAULT_MAX_DURATION)
             .setRequestsPerSecond(requestsPerSec);
 
         request.indices(AnomalyDetectorsIndex.jobResultsAliasedName(job.getId()));

+ 3 - 0
x-pack/plugin/ml/src/main/java/org/elasticsearch/xpack/ml/job/retention/MlDataRemover.java

@@ -7,11 +7,14 @@ package org.elasticsearch.xpack.ml.job.retention;
 
 import org.elasticsearch.action.ActionListener;
 import org.elasticsearch.common.document.DocumentField;
+import org.elasticsearch.common.unit.TimeValue;
 import org.elasticsearch.search.SearchHit;
 
 import java.util.function.Supplier;
 
 public interface MlDataRemover {
+    TimeValue DEFAULT_MAX_DURATION = TimeValue.timeValueHours(8L);
+
     void remove(float requestsPerSecond, ActionListener<Boolean> listener, Supplier<Boolean> isTimedOutSupplier);
 
     /**

+ 1 - 0
x-pack/plugin/ml/src/main/java/org/elasticsearch/xpack/ml/job/retention/UnusedStateRemover.java

@@ -142,6 +142,7 @@ public class UnusedStateRemover implements MlDataRemover {
             .setIndicesOptions(IndicesOptions.lenientExpandOpen())
             .setAbortOnVersionConflict(false)
             .setRequestsPerSecond(requestsPerSec)
+            .setTimeout(DEFAULT_MAX_DURATION)
             .setQuery(QueryBuilders.idsQuery().addIds(unusedDocIds.toArray(new String[0])));
 
         // _doc is the most efficient sort order and will also disable scoring

+ 1 - 0
x-pack/plugin/ml/src/main/java/org/elasticsearch/xpack/ml/job/retention/UnusedStatsRemover.java

@@ -99,6 +99,7 @@ public class UnusedStatsRemover implements MlDataRemover {
             .setIndicesOptions(IndicesOptions.lenientExpandOpen())
             .setAbortOnVersionConflict(false)
             .setRequestsPerSecond(requestsPerSec)
+            .setTimeout(DEFAULT_MAX_DURATION)
             .setQuery(dbq);
         deleteByQueryRequest.setParentTask(parentTaskId);
 

+ 45 - 3
x-pack/plugin/ml/src/test/java/org/elasticsearch/xpack/ml/action/TransportDeleteExpiredDataActionTests.java

@@ -17,6 +17,7 @@ import org.elasticsearch.xpack.core.ml.action.DeleteExpiredDataAction;
 import org.elasticsearch.xpack.ml.job.persistence.JobConfigProvider;
 import org.elasticsearch.xpack.ml.job.persistence.JobResultsProvider;
 import org.elasticsearch.xpack.ml.job.retention.MlDataRemover;
+import org.elasticsearch.xpack.ml.notifications.AnomalyDetectionAuditor;
 import org.junit.After;
 import org.junit.Before;
 
@@ -29,12 +30,18 @@ import java.util.function.Supplier;
 import java.util.stream.Collectors;
 import java.util.stream.Stream;
 
+import static org.mockito.Matchers.anyString;
+import static org.mockito.Matchers.eq;
 import static org.mockito.Mockito.mock;
+import static org.mockito.Mockito.times;
+import static org.mockito.Mockito.verify;
+import static org.mockito.Mockito.verifyNoMoreInteractions;
 
 public class TransportDeleteExpiredDataActionTests extends ESTestCase {
 
     private ThreadPool threadPool;
     private TransportDeleteExpiredDataAction transportDeleteExpiredDataAction;
+    private AnomalyDetectionAuditor auditor;
 
     /**
      * A data remover that only checks for timeouts.
@@ -56,9 +63,10 @@ public class TransportDeleteExpiredDataActionTests extends ESTestCase {
         TransportService transportService = mock(TransportService.class);
         Client client = mock(Client.class);
         ClusterService clusterService = mock(ClusterService.class);
+        auditor = mock(AnomalyDetectionAuditor.class);
         transportDeleteExpiredDataAction = new TransportDeleteExpiredDataAction(threadPool, ThreadPool.Names.SAME, transportService,
             new ActionFilters(Collections.emptySet()), client, clusterService, mock(JobConfigProvider.class),
-            mock(JobResultsProvider.class),
+            mock(JobResultsProvider.class), auditor,
             Clock.systemUTC());
     }
 
@@ -81,7 +89,8 @@ public class TransportDeleteExpiredDataActionTests extends ESTestCase {
 
         Supplier<Boolean> isTimedOutSupplier = () -> false;
 
-        transportDeleteExpiredDataAction.deleteExpiredData(removers.iterator(), 1.0f, finalListener, isTimedOutSupplier, true);
+        DeleteExpiredDataAction.Request request = new DeleteExpiredDataAction.Request(null, null);
+        transportDeleteExpiredDataAction.deleteExpiredData(request, removers.iterator(), 1.0f, finalListener, isTimedOutSupplier, true);
 
         assertTrue(succeeded.get());
     }
@@ -101,8 +110,41 @@ public class TransportDeleteExpiredDataActionTests extends ESTestCase {
 
         Supplier<Boolean> isTimedOutSupplier = () -> (removersRemaining.getAndDecrement() <= 0);
 
-        transportDeleteExpiredDataAction.deleteExpiredData(removers.iterator(), 1.0f, finalListener, isTimedOutSupplier, true);
+        DeleteExpiredDataAction.Request request = new DeleteExpiredDataAction.Request(null, null);
+        request.setJobId("_all");
+        transportDeleteExpiredDataAction.deleteExpiredData(request, removers.iterator(), 1.0f, finalListener, isTimedOutSupplier, true);
+        assertFalse(succeeded.get());
+
+        verify(auditor, times(1)).warning("",
+            "Deleting expired ML data was cancelled after the timeout period of [8h] was exceeded. " +
+                "The setting [xpack.ml.nightly_maintenance_requests_per_second] " +
+                "controls the deletion rate, consider increasing the value to assist in pruning old data");
+        verifyNoMoreInteractions(auditor);
+    }
+
+    public void testDeleteExpiredDataIterationWithTimeout_GivenJobIds() {
+
+        final int numRemovers = randomIntBetween(2, 5);
+        AtomicInteger removersRemaining = new AtomicInteger(randomIntBetween(0, numRemovers - 1));
+
+        List<MlDataRemover> removers = Stream.generate(DummyDataRemover::new).limit(numRemovers).collect(Collectors.toList());
 
+        AtomicBoolean succeeded = new AtomicBoolean();
+        ActionListener<DeleteExpiredDataAction.Response> finalListener = ActionListener.wrap(
+            response -> succeeded.set(response.isDeleted()),
+            e -> fail(e.getMessage())
+        );
+
+        Supplier<Boolean> isTimedOutSupplier = () -> (removersRemaining.getAndDecrement() <= 0);
+
+        DeleteExpiredDataAction.Request request = new DeleteExpiredDataAction.Request(null, null);
+        request.setJobId("foo*");
+        request.setExpandedJobIds(new String[] {"foo1", "foo2"});
+        transportDeleteExpiredDataAction.deleteExpiredData(request, removers.iterator(), 1.0f, finalListener, isTimedOutSupplier, true);
         assertFalse(succeeded.get());
+
+        verify(auditor, times(1)).warning(eq("foo1"), anyString());
+        verify(auditor, times(1)).warning(eq("foo2"), anyString());
+        verifyNoMoreInteractions(auditor);
     }
 }