Browse Source

[ML] refactor internal datafeed management (#74018)

This unifies the concept of object management between datafeeds and anomaly jobs.
Benjamin Trent 4 years ago
parent
commit
a5adc7df9c

+ 14 - 2
x-pack/plugin/ml/src/main/java/org/elasticsearch/xpack/ml/MachineLearning.java

@@ -258,6 +258,7 @@ import org.elasticsearch.xpack.ml.autoscaling.MlAutoscalingNamedWritableProvider
 import org.elasticsearch.xpack.ml.datafeed.DatafeedConfigAutoUpdater;
 import org.elasticsearch.xpack.ml.datafeed.DatafeedContextProvider;
 import org.elasticsearch.xpack.ml.datafeed.DatafeedJobBuilder;
+import org.elasticsearch.xpack.ml.datafeed.DatafeedManager;
 import org.elasticsearch.xpack.ml.datafeed.DatafeedRunner;
 import org.elasticsearch.xpack.ml.datafeed.persistence.DatafeedConfigProvider;
 import org.elasticsearch.xpack.ml.dataframe.DataFrameAnalyticsManager;
@@ -699,7 +700,17 @@ public class MachineLearning extends Plugin implements SystemIndexPlugin,
             threadPool,
             client,
             notifier,
-            xContentRegistry);
+            xContentRegistry,
+            indexNameExpressionResolver
+        );
+        DatafeedManager datafeedManager = new DatafeedManager(
+            datafeedConfigProvider,
+            jobConfigProvider,
+            xContentRegistry,
+            clusterService,
+            settings,
+            client
+        );
 
         // special holder for @link(MachineLearningFeatureSetUsage) which needs access to job manager if ML is enabled
         JobManagerHolder jobManagerHolder = new JobManagerHolder(jobManager);
@@ -852,7 +863,8 @@ public class MachineLearning extends Plugin implements SystemIndexPlugin,
                 autodetectProcessManager,
                 new MlInitializationService(settings, threadPool, clusterService, client, mlAssignmentNotifier),
                 jobDataCountsPersister,
-            datafeedRunner,
+                datafeedRunner,
+                datafeedManager,
                 anomalyDetectionAuditor,
                 dataFrameAnalyticsAuditor,
                 inferenceAuditor,

+ 7 - 43
x-pack/plugin/ml/src/main/java/org/elasticsearch/xpack/ml/action/TransportDeleteDatafeedAction.java

@@ -19,7 +19,6 @@ import org.elasticsearch.cluster.metadata.IndexNameExpressionResolver;
 import org.elasticsearch.cluster.service.ClusterService;
 import org.elasticsearch.common.inject.Inject;
 import org.elasticsearch.common.settings.Settings;
-import org.elasticsearch.common.xcontent.NamedXContentRegistry;
 import org.elasticsearch.persistent.PersistentTasksCustomMetadata;
 import org.elasticsearch.persistent.PersistentTasksService;
 import org.elasticsearch.tasks.Task;
@@ -28,12 +27,9 @@ import org.elasticsearch.transport.TransportService;
 import org.elasticsearch.xpack.core.ml.MlTasks;
 import org.elasticsearch.xpack.core.ml.action.DeleteDatafeedAction;
 import org.elasticsearch.xpack.core.ml.action.IsolateDatafeedAction;
-import org.elasticsearch.xpack.core.ml.datafeed.DatafeedState;
-import org.elasticsearch.xpack.core.ml.job.messages.Messages;
 import org.elasticsearch.xpack.core.ml.utils.ExceptionsHelper;
 import org.elasticsearch.xpack.ml.MlConfigMigrationEligibilityCheck;
-import org.elasticsearch.xpack.ml.datafeed.persistence.DatafeedConfigProvider;
-import org.elasticsearch.xpack.ml.job.persistence.JobDataDeleter;
+import org.elasticsearch.xpack.ml.datafeed.DatafeedManager;
 
 import static org.elasticsearch.xpack.core.ClientHelper.ML_ORIGIN;
 import static org.elasticsearch.xpack.core.ClientHelper.executeAsyncWithOrigin;
@@ -41,8 +37,7 @@ import static org.elasticsearch.xpack.core.ClientHelper.executeAsyncWithOrigin;
 public class TransportDeleteDatafeedAction extends AcknowledgedTransportMasterNodeAction<DeleteDatafeedAction.Request> {
 
     private final Client client;
-    private final DatafeedConfigProvider datafeedConfigProvider;
-    private final ClusterService clusterService;
+    private final DatafeedManager datafeedManager;
     private final PersistentTasksService persistentTasksService;
     private final MlConfigMigrationEligibilityCheck migrationEligibilityCheck;
 
@@ -51,14 +46,13 @@ public class TransportDeleteDatafeedAction extends AcknowledgedTransportMasterNo
                                          ThreadPool threadPool, ActionFilters actionFilters,
                                          IndexNameExpressionResolver indexNameExpressionResolver,
                                          Client client, PersistentTasksService persistentTasksService,
-                                         NamedXContentRegistry xContentRegistry) {
+                                         DatafeedManager datafeedManager) {
         super(DeleteDatafeedAction.NAME, transportService, clusterService, threadPool, actionFilters,
                 DeleteDatafeedAction.Request::new, indexNameExpressionResolver, ThreadPool.Names.SAME);
         this.client = client;
-        this.datafeedConfigProvider = new DatafeedConfigProvider(client, xContentRegistry);
         this.persistentTasksService = persistentTasksService;
-        this.clusterService = clusterService;
         this.migrationEligibilityCheck = new MlConfigMigrationEligibilityCheck(settings, clusterService);
+        this.datafeedManager = datafeedManager;
     }
 
     @Override
@@ -73,14 +67,15 @@ public class TransportDeleteDatafeedAction extends AcknowledgedTransportMasterNo
         if (request.isForce()) {
             forceDeleteDatafeed(request, state, listener);
         } else {
-            deleteDatafeedConfig(request, listener);
+            datafeedManager.deleteDatafeed(request, state, listener);
         }
     }
 
     private void forceDeleteDatafeed(DeleteDatafeedAction.Request request, ClusterState state,
                                      ActionListener<AcknowledgedResponse> listener) {
         ActionListener<Boolean> finalListener = ActionListener.wrap(
-                response -> deleteDatafeedConfig(request, listener),
+                // use clusterService.state() here so that the updated state without the task is available
+                response -> datafeedManager.deleteDatafeed(request, clusterService.state(), listener),
                 listener::onFailure
         );
 
@@ -119,37 +114,6 @@ public class TransportDeleteDatafeedAction extends AcknowledgedTransportMasterNo
         }
     }
 
-    private void deleteDatafeedConfig(DeleteDatafeedAction.Request request, ActionListener<AcknowledgedResponse> listener) {
-        // Check datafeed is stopped
-        PersistentTasksCustomMetadata tasks = clusterService.state().getMetadata().custom(PersistentTasksCustomMetadata.TYPE);
-        if (MlTasks.getDatafeedTask(request.getDatafeedId(), tasks) != null) {
-            listener.onFailure(ExceptionsHelper.conflictStatusException(
-                    Messages.getMessage(Messages.DATAFEED_CANNOT_DELETE_IN_CURRENT_STATE, request.getDatafeedId(), DatafeedState.STARTED)));
-            return;
-        }
-
-        String datafeedId = request.getDatafeedId();
-
-        datafeedConfigProvider.getDatafeedConfig(
-            datafeedId,
-            ActionListener.wrap(
-                datafeedConfigBuilder -> {
-                    String jobId = datafeedConfigBuilder.build().getJobId();
-                    JobDataDeleter jobDataDeleter = new JobDataDeleter(client, jobId);
-                    jobDataDeleter.deleteDatafeedTimingStats(
-                        ActionListener.wrap(
-                            unused1 -> {
-                                datafeedConfigProvider.deleteDatafeedConfig(
-                                    datafeedId,
-                                    ActionListener.wrap(
-                                        unused2 -> listener.onResponse(AcknowledgedResponse.TRUE),
-                                        listener::onFailure));
-                            },
-                            listener::onFailure));
-                },
-                listener::onFailure));
-    }
-
     @Override
     protected ClusterBlockException checkBlock(DeleteDatafeedAction.Request request, ClusterState state) {
         return state.blocks().globalBlockedException(ClusterBlockLevel.METADATA_WRITE);

+ 33 - 54
x-pack/plugin/ml/src/main/java/org/elasticsearch/xpack/ml/action/TransportDeleteJobAction.java

@@ -8,6 +8,7 @@ package org.elasticsearch.xpack.ml.action;
 
 import org.apache.logging.log4j.LogManager;
 import org.apache.logging.log4j.Logger;
+import org.apache.logging.log4j.message.ParameterizedMessage;
 import org.elasticsearch.ElasticsearchStatusException;
 import org.elasticsearch.ResourceNotFoundException;
 import org.elasticsearch.action.ActionListener;
@@ -23,10 +24,9 @@ import org.elasticsearch.cluster.block.ClusterBlockException;
 import org.elasticsearch.cluster.block.ClusterBlockLevel;
 import org.elasticsearch.cluster.metadata.IndexNameExpressionResolver;
 import org.elasticsearch.cluster.service.ClusterService;
+import org.elasticsearch.core.Nullable;
 import org.elasticsearch.common.inject.Inject;
 import org.elasticsearch.common.settings.Settings;
-import org.elasticsearch.core.CheckedConsumer;
-import org.elasticsearch.core.Nullable;
 import org.elasticsearch.persistent.PersistentTasksCustomMetadata;
 import org.elasticsearch.persistent.PersistentTasksService;
 import org.elasticsearch.tasks.Task;
@@ -46,9 +46,8 @@ import org.elasticsearch.xpack.core.ml.job.messages.Messages;
 import org.elasticsearch.xpack.core.ml.utils.ExceptionsHelper;
 import org.elasticsearch.xpack.ml.MlConfigMigrationEligibilityCheck;
 import org.elasticsearch.xpack.ml.datafeed.persistence.DatafeedConfigProvider;
+import org.elasticsearch.xpack.ml.job.JobManager;
 import org.elasticsearch.xpack.ml.job.persistence.JobConfigProvider;
-import org.elasticsearch.xpack.ml.job.persistence.JobDataDeleter;
-import org.elasticsearch.xpack.ml.job.persistence.JobResultsProvider;
 import org.elasticsearch.xpack.ml.notifications.AnomalyDetectionAuditor;
 import org.elasticsearch.xpack.ml.process.MlMemoryTracker;
 
@@ -68,8 +67,8 @@ public class TransportDeleteJobAction extends AcknowledgedTransportMasterNodeAct
     private final Client client;
     private final PersistentTasksService persistentTasksService;
     private final AnomalyDetectionAuditor auditor;
-    private final JobResultsProvider jobResultsProvider;
     private final JobConfigProvider jobConfigProvider;
+    private final JobManager jobManager;
     private final DatafeedConfigProvider datafeedConfigProvider;
     private final MlMemoryTracker memoryTracker;
     private final MlConfigMigrationEligibilityCheck migrationEligibilityCheck;
@@ -86,20 +85,20 @@ public class TransportDeleteJobAction extends AcknowledgedTransportMasterNodeAct
     public TransportDeleteJobAction(Settings settings, TransportService transportService, ClusterService clusterService,
                                     ThreadPool threadPool, ActionFilters actionFilters,
                                     IndexNameExpressionResolver indexNameExpressionResolver, PersistentTasksService persistentTasksService,
-                                    Client client, AnomalyDetectionAuditor auditor, JobResultsProvider jobResultsProvider,
+                                    Client client, AnomalyDetectionAuditor auditor,
                                     JobConfigProvider jobConfigProvider, DatafeedConfigProvider datafeedConfigProvider,
-                                    MlMemoryTracker memoryTracker) {
+                                    MlMemoryTracker memoryTracker, JobManager jobManager) {
         super(DeleteJobAction.NAME, transportService, clusterService, threadPool, actionFilters,
                 DeleteJobAction.Request::new, indexNameExpressionResolver, ThreadPool.Names.SAME);
         this.client = client;
         this.persistentTasksService = persistentTasksService;
         this.auditor = auditor;
-        this.jobResultsProvider = jobResultsProvider;
         this.jobConfigProvider = jobConfigProvider;
         this.datafeedConfigProvider = datafeedConfigProvider;
         this.memoryTracker = memoryTracker;
         this.migrationEligibilityCheck = new MlConfigMigrationEligibilityCheck(settings, clusterService);
         this.listenersByJobId = new HashMap<>();
+        this.jobManager = jobManager;
     }
 
     @Override
@@ -116,7 +115,7 @@ public class TransportDeleteJobAction extends AcknowledgedTransportMasterNodeAct
             return;
         }
 
-        logger.debug("Deleting job '{}'", request.getJobId());
+        logger.debug(() -> new ParameterizedMessage("[{}] deleting job ", request.getJobId()));
 
         if (request.isForce() == false) {
             checkJobIsNotOpen(request.getJobId(), state);
@@ -128,8 +127,11 @@ public class TransportDeleteJobAction extends AcknowledgedTransportMasterNodeAct
         // Check if there is a deletion task for this job already and if yes wait for it to complete
         synchronized (listenersByJobId) {
             if (listenersByJobId.containsKey(request.getJobId())) {
-                logger.debug("[{}] Deletion task [{}] will wait for existing deletion task to complete",
-                        request.getJobId(), task.getId());
+                logger.debug(() -> new ParameterizedMessage(
+                    "[{}] Deletion task [{}] will wait for existing deletion task to complete",
+                    request.getJobId(),
+                    task.getId()
+                ));
                 listenersByJobId.get(request.getJobId()).add(listener);
                 return;
             } else {
@@ -153,9 +155,9 @@ public class TransportDeleteJobAction extends AcknowledgedTransportMasterNodeAct
         ActionListener<PutJobAction.Response> markAsDeletingListener = ActionListener.wrap(
                 response -> {
                     if (request.isForce()) {
-                        forceDeleteJob(parentTaskClient, request, finalListener);
+                        forceDeleteJob(parentTaskClient, request, state, finalListener);
                     } else {
-                        normalDeleteJob(parentTaskClient, request, finalListener);
+                        normalDeleteJob(parentTaskClient, request, state, finalListener);
                     }
                 },
                 finalListener::onFailure);
@@ -171,7 +173,7 @@ public class TransportDeleteJobAction extends AcknowledgedTransportMasterNodeAct
                     logger.info(
                         "[{}] config is missing but task exists. Attempting to delete tasks and stop process",
                         request.getJobId());
-                    forceDeleteJob(parentTaskClient, request, finalListener);
+                    forceDeleteJob(parentTaskClient, request, state, finalListener);
                 } else {
                     finalListener.onFailure(e);
                 }
@@ -199,64 +201,41 @@ public class TransportDeleteJobAction extends AcknowledgedTransportMasterNodeAct
         }
     }
 
-    private void normalDeleteJob(ParentTaskAssigningClient parentTaskClient, DeleteJobAction.Request request,
+    private void normalDeleteJob(ParentTaskAssigningClient parentTaskClient,
+                                 DeleteJobAction.Request request,
+                                 ClusterState state,
                                  ActionListener<AcknowledgedResponse> listener) {
         String jobId = request.getJobId();
 
         // We clean up the memory tracker on delete rather than close as close is not a master node action
         memoryTracker.removeAnomalyDetectorJob(jobId);
 
-        // Step 4. When the job has been removed from the cluster state, return a response
-        // -------
-        CheckedConsumer<Boolean, Exception> apiResponseHandler = jobDeleted -> {
-            if (jobDeleted) {
-                logger.info("Job [" + jobId + "] deleted");
-                auditor.info(jobId, Messages.getMessage(Messages.JOB_AUDIT_DELETED));
-                listener.onResponse(AcknowledgedResponse.TRUE);
-            } else {
-                listener.onResponse(AcknowledgedResponse.FALSE);
-            }
-        };
-
-        // Step 3. When the physical storage has been deleted, delete the job config document
-        // -------
-        // Don't report an error if the document has already been deleted
-        CheckedConsumer<Boolean, Exception> deleteJobStateHandler = response -> jobConfigProvider.deleteJob(jobId, false,
-                ActionListener.wrap(
-                        deleteResponse -> apiResponseHandler.accept(Boolean.TRUE),
-                        listener::onFailure
-                )
-        );
-
-        // Step 2. Remove the job from any calendars
-        CheckedConsumer<Boolean, Exception> removeFromCalendarsHandler = response -> jobResultsProvider.removeJobFromCalendars(jobId,
-                ActionListener.wrap(deleteJobStateHandler::accept, listener::onFailure));
-
-
-        // Step 1. Delete the physical storage
-        new JobDataDeleter(parentTaskClient, jobId).deleteJobDocuments(
-            jobConfigProvider, indexNameExpressionResolver, clusterService.state(), removeFromCalendarsHandler, listener::onFailure);
+        jobManager.deleteJob(request, parentTaskClient, state, listener);
     }
 
-    private void forceDeleteJob(ParentTaskAssigningClient parentTaskClient, DeleteJobAction.Request request,
-                                ActionListener<AcknowledgedResponse> listener) {
-
-        logger.debug("Force deleting job [{}]", request.getJobId());
+    private void forceDeleteJob(
+        ParentTaskAssigningClient parentTaskClient,
+        DeleteJobAction.Request request,
+        ClusterState state,
+        ActionListener<AcknowledgedResponse> listener
+    ) {
 
-        final ClusterState state = clusterService.state();
         final String jobId = request.getJobId();
+        logger.debug(() -> new ParameterizedMessage("[{}] force deleting job", jobId));
 
         // 3. Delete the job
         ActionListener<Boolean> removeTaskListener = new ActionListener<Boolean>() {
             @Override
             public void onResponse(Boolean response) {
-                normalDeleteJob(parentTaskClient, request, listener);
+                // use clusterService.state() here so that the updated state without the task is available
+                normalDeleteJob(parentTaskClient, request, clusterService.state(), listener);
             }
 
             @Override
             public void onFailure(Exception e) {
                 if (ExceptionsHelper.unwrapCause(e) instanceof ResourceNotFoundException) {
-                    normalDeleteJob(parentTaskClient, request, listener);
+                    // use clusterService.state() here so that the updated state without the task is available
+                    normalDeleteJob(parentTaskClient, request, clusterService.state(), listener);
                 } else {
                     listener.onFailure(e);
                 }
@@ -266,12 +245,12 @@ public class TransportDeleteJobAction extends AcknowledgedTransportMasterNodeAct
         // 2. Cancel the persistent task. This closes the process gracefully so
         // the process should be killed first.
         ActionListener<KillProcessAction.Response> killJobListener = ActionListener.wrap(
-                response -> removePersistentTask(request.getJobId(), state, removeTaskListener),
+                response -> removePersistentTask(jobId, state, removeTaskListener),
                 e -> {
                     if (ExceptionsHelper.unwrapCause(e) instanceof ElasticsearchStatusException) {
                         // Killing the process marks the task as completed so it
                         // may have disappeared when we get here
-                        removePersistentTask(request.getJobId(), state, removeTaskListener);
+                        removePersistentTask(jobId, state, removeTaskListener);
                     } else {
                         listener.onFailure(e);
                     }

+ 8 - 61
x-pack/plugin/ml/src/main/java/org/elasticsearch/xpack/ml/action/TransportGetDatafeedsAction.java

@@ -11,47 +11,34 @@ import org.apache.logging.log4j.Logger;
 import org.elasticsearch.action.ActionListener;
 import org.elasticsearch.action.support.ActionFilters;
 import org.elasticsearch.action.support.master.TransportMasterNodeReadAction;
-import org.elasticsearch.client.Client;
 import org.elasticsearch.cluster.ClusterState;
 import org.elasticsearch.cluster.block.ClusterBlockException;
 import org.elasticsearch.cluster.block.ClusterBlockLevel;
 import org.elasticsearch.cluster.metadata.IndexNameExpressionResolver;
 import org.elasticsearch.cluster.service.ClusterService;
 import org.elasticsearch.common.inject.Inject;
-import org.elasticsearch.common.xcontent.NamedXContentRegistry;
 import org.elasticsearch.tasks.Task;
 import org.elasticsearch.threadpool.ThreadPool;
 import org.elasticsearch.transport.TransportService;
-import org.elasticsearch.xpack.core.action.util.QueryPage;
-import org.elasticsearch.xpack.core.ml.MlMetadata;
 import org.elasticsearch.xpack.core.ml.action.GetDatafeedsAction;
-import org.elasticsearch.xpack.core.ml.datafeed.DatafeedConfig;
-import org.elasticsearch.xpack.ml.datafeed.persistence.DatafeedConfigProvider;
-
-import java.util.ArrayList;
-import java.util.Collections;
-import java.util.Comparator;
-import java.util.HashMap;
-import java.util.List;
-import java.util.Map;
-import java.util.Set;
+import org.elasticsearch.xpack.ml.datafeed.DatafeedManager;
 
 public class TransportGetDatafeedsAction extends TransportMasterNodeReadAction<GetDatafeedsAction.Request, GetDatafeedsAction.Response> {
 
     private static final Logger logger = LogManager.getLogger(TransportGetDatafeedsAction.class);
 
-    private final DatafeedConfigProvider datafeedConfigProvider;
+    private final DatafeedManager datafeedManager;
 
     @Inject
     public TransportGetDatafeedsAction(TransportService transportService,
                                        ClusterService clusterService, ThreadPool threadPool,
                                        ActionFilters actionFilters,
-                                       IndexNameExpressionResolver indexNameExpressionResolver,
-                                       Client client, NamedXContentRegistry xContentRegistry) {
+                                       DatafeedManager datafeedManager,
+                                       IndexNameExpressionResolver indexNameExpressionResolver) {
             super(GetDatafeedsAction.NAME, transportService, clusterService, threadPool, actionFilters,
                     GetDatafeedsAction.Request::new, indexNameExpressionResolver, GetDatafeedsAction.Response::new, ThreadPool.Names.SAME);
 
-        datafeedConfigProvider = new DatafeedConfigProvider(client, xContentRegistry);
+        this.datafeedManager = datafeedManager;
     }
 
     @Override
@@ -59,52 +46,12 @@ public class TransportGetDatafeedsAction extends TransportMasterNodeReadAction<G
                                    ActionListener<GetDatafeedsAction.Response> listener) {
         logger.debug("Get datafeed '{}'", request.getDatafeedId());
 
-        Map<String, DatafeedConfig> clusterStateConfigs =
-                expandClusterStateDatafeeds(request.getDatafeedId(), request.allowNoMatch(), state);
-
-        datafeedConfigProvider.expandDatafeedConfigs(request.getDatafeedId(), request.allowNoMatch(), ActionListener.wrap(
-                datafeedBuilders -> {
-                    // Check for duplicate datafeeds
-                    for (DatafeedConfig.Builder datafeed : datafeedBuilders) {
-                        if (clusterStateConfigs.containsKey(datafeed.getId())) {
-                            listener.onFailure(new IllegalStateException("Datafeed [" + datafeed.getId() + "] configuration " +
-                                    "exists in both clusterstate and index"));
-                            return;
-                        }
-                    }
-
-                    // Merge cluster state and index configs
-                    List<DatafeedConfig> datafeeds = new ArrayList<>(datafeedBuilders.size() + clusterStateConfigs.values().size());
-                    for (DatafeedConfig.Builder builder: datafeedBuilders) {
-                        datafeeds.add(builder.build());
-                    }
-
-                    datafeeds.addAll(clusterStateConfigs.values());
-                    Collections.sort(datafeeds, Comparator.comparing(DatafeedConfig::getId));
-                    listener.onResponse(new GetDatafeedsAction.Response(new QueryPage<>(datafeeds, datafeeds.size(),
-                            DatafeedConfig.RESULTS_FIELD)));
-                },
-                listener::onFailure
+        datafeedManager.getDatafeeds(request, state, ActionListener.wrap(
+            datafeeds -> listener.onResponse(new GetDatafeedsAction.Response(datafeeds)),
+            listener::onFailure
         ));
     }
 
-    Map<String, DatafeedConfig> expandClusterStateDatafeeds(String datafeedExpression, boolean allowNoMatch, ClusterState clusterState) {
-
-        Map<String, DatafeedConfig> configById = new HashMap<>();
-        try {
-            MlMetadata mlMetadata = MlMetadata.getMlMetadata(clusterState);
-            Set<String> expandedDatafeedIds = mlMetadata.expandDatafeedIds(datafeedExpression, allowNoMatch);
-
-            for (String expandedDatafeedId : expandedDatafeedIds) {
-                configById.put(expandedDatafeedId, mlMetadata.getDatafeed(expandedDatafeedId));
-            }
-        } catch (Exception e){
-            // ignore
-        }
-
-        return configById;
-    }
-
     @Override
     protected ClusterBlockException checkBlock(GetDatafeedsAction.Request request, ClusterState state) {
         return state.blocks().globalBlockedException(ClusterBlockLevel.METADATA_READ);

+ 6 - 201
x-pack/plugin/ml/src/main/java/org/elasticsearch/xpack/ml/action/TransportPutDatafeedAction.java

@@ -6,247 +6,52 @@
  */
 package org.elasticsearch.xpack.ml.action;
 
-import org.apache.logging.log4j.LogManager;
-import org.apache.logging.log4j.Logger;
-import org.elasticsearch.ElasticsearchException;
 import org.elasticsearch.action.ActionListener;
-import org.elasticsearch.action.search.SearchAction;
 import org.elasticsearch.action.support.ActionFilters;
 import org.elasticsearch.action.support.master.TransportMasterNodeAction;
-import org.elasticsearch.client.Client;
 import org.elasticsearch.cluster.ClusterState;
 import org.elasticsearch.cluster.block.ClusterBlockException;
 import org.elasticsearch.cluster.block.ClusterBlockLevel;
 import org.elasticsearch.cluster.metadata.IndexNameExpressionResolver;
 import org.elasticsearch.cluster.service.ClusterService;
-import org.elasticsearch.core.CheckedConsumer;
-import org.elasticsearch.core.Nullable;
-import org.elasticsearch.common.Strings;
 import org.elasticsearch.common.inject.Inject;
 import org.elasticsearch.common.settings.Settings;
-import org.elasticsearch.common.xcontent.NamedXContentRegistry;
-import org.elasticsearch.common.xcontent.XContentBuilder;
-import org.elasticsearch.common.xcontent.json.JsonXContent;
-import org.elasticsearch.index.IndexNotFoundException;
 import org.elasticsearch.license.LicenseUtils;
-import org.elasticsearch.license.RemoteClusterLicenseChecker;
 import org.elasticsearch.license.XPackLicenseState;
 import org.elasticsearch.tasks.Task;
 import org.elasticsearch.threadpool.ThreadPool;
 import org.elasticsearch.transport.TransportService;
 import org.elasticsearch.xpack.core.XPackField;
 import org.elasticsearch.xpack.core.XPackSettings;
-import org.elasticsearch.xpack.core.ml.MlConfigIndex;
-import org.elasticsearch.xpack.core.ml.MlMetadata;
 import org.elasticsearch.xpack.core.ml.action.PutDatafeedAction;
-import org.elasticsearch.xpack.core.ml.datafeed.DatafeedConfig;
-import org.elasticsearch.xpack.core.ml.job.persistence.ElasticsearchMappings;
-import org.elasticsearch.xpack.core.ml.utils.ExceptionsHelper;
-import org.elasticsearch.xpack.core.rollup.action.GetRollupIndexCapsAction;
-import org.elasticsearch.xpack.core.rollup.action.RollupSearchAction;
 import org.elasticsearch.xpack.core.security.SecurityContext;
-import org.elasticsearch.xpack.core.security.action.user.HasPrivilegesAction;
-import org.elasticsearch.xpack.core.security.action.user.HasPrivilegesRequest;
-import org.elasticsearch.xpack.core.security.action.user.HasPrivilegesResponse;
-import org.elasticsearch.xpack.core.security.authz.RoleDescriptor;
-import org.elasticsearch.xpack.core.security.authz.permission.ResourcePrivileges;
-import org.elasticsearch.xpack.core.security.support.Exceptions;
-import org.elasticsearch.xpack.ml.datafeed.persistence.DatafeedConfigProvider;
-import org.elasticsearch.xpack.ml.job.persistence.JobConfigProvider;
+import org.elasticsearch.xpack.ml.datafeed.DatafeedManager;
 
-import java.io.IOException;
-import java.util.Collections;
-import java.util.Map;
-
-import static org.elasticsearch.xpack.core.ClientHelper.ML_ORIGIN;
-import static org.elasticsearch.xpack.core.ClientHelper.executeAsyncWithOrigin;
-import static org.elasticsearch.xpack.ml.utils.SecondaryAuthorizationUtils.useSecondaryAuthIfAvailable;
 
 public class TransportPutDatafeedAction extends TransportMasterNodeAction<PutDatafeedAction.Request, PutDatafeedAction.Response> {
 
-    private static final Logger logger = LogManager.getLogger(TransportPutDatafeedAction.class);
-
     private final XPackLicenseState licenseState;
-    private final Client client;
     private final SecurityContext securityContext;
-    private final DatafeedConfigProvider datafeedConfigProvider;
-    private final JobConfigProvider jobConfigProvider;
-    private final NamedXContentRegistry xContentRegistry;
+    private final DatafeedManager datafeedManager;
 
     @Inject
     public TransportPutDatafeedAction(Settings settings, TransportService transportService,
-                                      ClusterService clusterService, ThreadPool threadPool, Client client,
+                                      ClusterService clusterService, ThreadPool threadPool,
                                       XPackLicenseState licenseState, ActionFilters actionFilters,
                                       IndexNameExpressionResolver indexNameExpressionResolver,
-                                      NamedXContentRegistry xContentRegistry) {
+                                      DatafeedManager datafeedManager) {
         super(PutDatafeedAction.NAME, transportService, clusterService, threadPool, actionFilters, PutDatafeedAction.Request::new,
                 indexNameExpressionResolver, PutDatafeedAction.Response::new, ThreadPool.Names.SAME);
         this.licenseState = licenseState;
-        this.client = client;
         this.securityContext = XPackSettings.SECURITY_ENABLED.get(settings) ?
                 new SecurityContext(settings, threadPool.getThreadContext()) : null;
-        this.datafeedConfigProvider = new DatafeedConfigProvider(client, xContentRegistry);
-        this.jobConfigProvider = new JobConfigProvider(client, xContentRegistry);
-        this.xContentRegistry = xContentRegistry;
+        this.datafeedManager = datafeedManager;
     }
 
     @Override
     protected void masterOperation(Task task, PutDatafeedAction.Request request, ClusterState state,
                                    ActionListener<PutDatafeedAction.Response> listener) {
-        // If security is enabled only create the datafeed if the user requesting creation has
-        // permission to read the indices the datafeed is going to read from
-        if (licenseState.isSecurityEnabled()) {
-            useSecondaryAuthIfAvailable(securityContext, () -> {
-                final String[] indices = request.getDatafeed().getIndices().toArray(new String[0]);
-
-                final String username = securityContext.getUser().principal();
-                final HasPrivilegesRequest privRequest = new HasPrivilegesRequest();
-                privRequest.applicationPrivileges(new RoleDescriptor.ApplicationResourcePrivileges[0]);
-                privRequest.username(username);
-                privRequest.clusterPrivileges(Strings.EMPTY_ARRAY);
-
-                final RoleDescriptor.IndicesPrivileges.Builder indicesPrivilegesBuilder = RoleDescriptor.IndicesPrivileges.builder()
-                    .indices(indices);
-
-                ActionListener<HasPrivilegesResponse> privResponseListener = ActionListener.wrap(
-                    r -> handlePrivsResponse(username, request, r, state, listener),
-                    listener::onFailure);
-
-                ActionListener<GetRollupIndexCapsAction.Response> getRollupIndexCapsActionHandler = ActionListener.wrap(
-                    response -> {
-                        if (response.getJobs().isEmpty()) { // This means no rollup indexes are in the config
-                            indicesPrivilegesBuilder.privileges(SearchAction.NAME);
-                        } else {
-                            indicesPrivilegesBuilder.privileges(SearchAction.NAME, RollupSearchAction.NAME);
-                        }
-                        privRequest.indexPrivileges(indicesPrivilegesBuilder.build());
-                        client.execute(HasPrivilegesAction.INSTANCE, privRequest, privResponseListener);
-                    },
-                    e -> {
-                        if (ExceptionsHelper.unwrapCause(e) instanceof IndexNotFoundException) {
-                            indicesPrivilegesBuilder.privileges(SearchAction.NAME);
-                            privRequest.indexPrivileges(indicesPrivilegesBuilder.build());
-                            client.execute(HasPrivilegesAction.INSTANCE, privRequest, privResponseListener);
-                        } else {
-                            listener.onFailure(e);
-                        }
-                    }
-                );
-                if (RemoteClusterLicenseChecker.containsRemoteIndex(request.getDatafeed().getIndices())) {
-                    getRollupIndexCapsActionHandler.onResponse(new GetRollupIndexCapsAction.Response());
-                } else {
-                    executeAsyncWithOrigin(client,
-                        ML_ORIGIN,
-                        GetRollupIndexCapsAction.INSTANCE,
-                        new GetRollupIndexCapsAction.Request(indices),
-                        getRollupIndexCapsActionHandler);
-                }
-            });
-        } else {
-            putDatafeed(request, threadPool.getThreadContext().getHeaders(), state, listener);
-        }
-    }
-
-    private void handlePrivsResponse(String username,
-                                     PutDatafeedAction.Request request,
-                                     HasPrivilegesResponse response,
-                                     ClusterState clusterState,
-                                     ActionListener<PutDatafeedAction.Response> listener) throws IOException {
-        if (response.isCompleteMatch()) {
-            putDatafeed(request, threadPool.getThreadContext().getHeaders(), clusterState, listener);
-        } else {
-            XContentBuilder builder = JsonXContent.contentBuilder();
-            builder.startObject();
-            for (ResourcePrivileges index : response.getIndexPrivileges()) {
-                builder.field(index.getResource());
-                builder.map(index.getPrivileges());
-            }
-            builder.endObject();
-
-            listener.onFailure(Exceptions.authorizationError("Cannot create datafeed [{}]" +
-                            " because user {} lacks permissions on the indices: {}",
-                    request.getDatafeed().getId(), username, Strings.toString(builder)));
-        }
-    }
-
-    private void putDatafeed(PutDatafeedAction.Request request,
-                             Map<String, String> headers,
-                             ClusterState clusterState,
-                             ActionListener<PutDatafeedAction.Response> listener) {
-
-        String datafeedId = request.getDatafeed().getId();
-        String jobId = request.getDatafeed().getJobId();
-        ElasticsearchException validationError = checkConfigsAreNotDefinedInClusterState(datafeedId, jobId);
-        if (validationError != null) {
-            listener.onFailure(validationError);
-            return;
-        }
-        DatafeedConfig.validateAggregations(request.getDatafeed().getParsedAggregations(xContentRegistry));
-
-        CheckedConsumer<Boolean, Exception> mappingsUpdated = ok -> {
-            datafeedConfigProvider.putDatafeedConfig(
-                request.getDatafeed(),
-                headers,
-                ActionListener.wrap(
-                    indexResponse -> listener.onResponse(new PutDatafeedAction.Response(request.getDatafeed())),
-                    listener::onFailure
-            ));
-        };
-
-        CheckedConsumer<Boolean, Exception> validationOk = ok -> {
-            if (clusterState == null) {
-                logger.warn("Cannot update doc mapping because clusterState == null");
-                mappingsUpdated.accept(false);
-                return;
-            }
-            ElasticsearchMappings.addDocMappingIfMissing(
-                MlConfigIndex.indexName(),
-                MlConfigIndex::mapping,
-                client,
-                clusterState,
-                request.masterNodeTimeout(),
-                ActionListener.wrap(mappingsUpdated, listener::onFailure));
-        };
-
-        CheckedConsumer<Boolean, Exception> jobOk = ok ->
-            jobConfigProvider.validateDatafeedJob(request.getDatafeed(), ActionListener.wrap(validationOk, listener::onFailure));
-
-        checkJobDoesNotHaveADatafeed(jobId, ActionListener.wrap(jobOk, listener::onFailure));
-    }
-
-    /**
-     * Returns an exception if a datafeed with the same Id is defined in the
-     * cluster state or the job is in the cluster state and already has a datafeed
-     */
-    @Nullable
-    private ElasticsearchException checkConfigsAreNotDefinedInClusterState(String datafeedId, String jobId) {
-        ClusterState clusterState = clusterService.state();
-        MlMetadata mlMetadata = MlMetadata.getMlMetadata(clusterState);
-
-        if (mlMetadata.getDatafeed(datafeedId) != null) {
-            return ExceptionsHelper.datafeedAlreadyExists(datafeedId);
-        }
-
-        if (mlMetadata.getDatafeedByJobId(jobId).isPresent()) {
-            return ExceptionsHelper.conflictStatusException("Cannot create datafeed [" + datafeedId + "] as a " +
-                    "job [" + jobId + "] defined in the cluster state references a datafeed with the same Id");
-        }
-
-        return null;
-    }
-
-    private void checkJobDoesNotHaveADatafeed(String jobId, ActionListener<Boolean> listener) {
-        datafeedConfigProvider.findDatafeedsForJobIds(Collections.singletonList(jobId), ActionListener.wrap(
-                datafeedIds -> {
-                    if (datafeedIds.isEmpty()) {
-                        listener.onResponse(Boolean.TRUE);
-                    } else {
-                        listener.onFailure(ExceptionsHelper.conflictStatusException("A datafeed [" + datafeedIds.iterator().next()
-                                + "] already exists for job [" + jobId + "]"));
-                    }
-                },
-                listener::onFailure
-        ));
+        datafeedManager.putDatafeed(request, state, licenseState, securityContext, threadPool, listener);
     }
 
     @Override

+ 5 - 55
x-pack/plugin/ml/src/main/java/org/elasticsearch/xpack/ml/action/TransportUpdateDatafeedAction.java

@@ -9,7 +9,6 @@ package org.elasticsearch.xpack.ml.action;
 import org.elasticsearch.action.ActionListener;
 import org.elasticsearch.action.support.ActionFilters;
 import org.elasticsearch.action.support.master.TransportMasterNodeAction;
-import org.elasticsearch.client.Client;
 import org.elasticsearch.cluster.ClusterState;
 import org.elasticsearch.cluster.block.ClusterBlockException;
 import org.elasticsearch.cluster.block.ClusterBlockLevel;
@@ -17,89 +16,40 @@ import org.elasticsearch.cluster.metadata.IndexNameExpressionResolver;
 import org.elasticsearch.cluster.service.ClusterService;
 import org.elasticsearch.common.inject.Inject;
 import org.elasticsearch.common.settings.Settings;
-import org.elasticsearch.common.xcontent.NamedXContentRegistry;
-import org.elasticsearch.persistent.PersistentTasksCustomMetadata;
 import org.elasticsearch.tasks.Task;
 import org.elasticsearch.threadpool.ThreadPool;
 import org.elasticsearch.transport.TransportService;
 import org.elasticsearch.xpack.core.XPackSettings;
-import org.elasticsearch.xpack.core.ml.MlConfigIndex;
-import org.elasticsearch.xpack.core.ml.MlTasks;
 import org.elasticsearch.xpack.core.ml.action.PutDatafeedAction;
 import org.elasticsearch.xpack.core.ml.action.UpdateDatafeedAction;
-import org.elasticsearch.xpack.core.ml.datafeed.DatafeedState;
-import org.elasticsearch.xpack.core.ml.job.messages.Messages;
-import org.elasticsearch.xpack.core.ml.job.persistence.ElasticsearchMappings;
-import org.elasticsearch.xpack.core.ml.utils.ExceptionsHelper;
 import org.elasticsearch.xpack.core.security.SecurityContext;
-import org.elasticsearch.xpack.ml.MlConfigMigrationEligibilityCheck;
-import org.elasticsearch.xpack.ml.datafeed.persistence.DatafeedConfigProvider;
-import org.elasticsearch.xpack.ml.job.persistence.JobConfigProvider;
+import org.elasticsearch.xpack.ml.datafeed.DatafeedManager;
 
-import java.util.Map;
-
-import static org.elasticsearch.xpack.ml.utils.SecondaryAuthorizationUtils.useSecondaryAuthIfAvailable;
 
 public class TransportUpdateDatafeedAction extends
     TransportMasterNodeAction<UpdateDatafeedAction.Request, PutDatafeedAction.Response> {
 
-    private final DatafeedConfigProvider datafeedConfigProvider;
-    private final JobConfigProvider jobConfigProvider;
-    private final MlConfigMigrationEligibilityCheck migrationEligibilityCheck;
+    private final DatafeedManager datafeedManager;
     private final SecurityContext securityContext;
-    private final Client client;
 
     @Inject
     public TransportUpdateDatafeedAction(Settings settings, TransportService transportService, ClusterService clusterService,
                                          ThreadPool threadPool, ActionFilters actionFilters,
                                          IndexNameExpressionResolver indexNameExpressionResolver,
-                                         Client client, NamedXContentRegistry xContentRegistry) {
+                                         DatafeedManager datafeedManager) {
         super(UpdateDatafeedAction.NAME, transportService, clusterService, threadPool, actionFilters, UpdateDatafeedAction.Request::new,
                 indexNameExpressionResolver, PutDatafeedAction.Response::new, ThreadPool.Names.SAME);
 
-        this.datafeedConfigProvider = new DatafeedConfigProvider(client, xContentRegistry);
-        this.jobConfigProvider = new JobConfigProvider(client, xContentRegistry);
-        this.migrationEligibilityCheck = new MlConfigMigrationEligibilityCheck(settings, clusterService);
+        this.datafeedManager = datafeedManager;
         this.securityContext = XPackSettings.SECURITY_ENABLED.get(settings) ?
             new SecurityContext(settings, threadPool.getThreadContext()) : null;
-        this.client = client;
     }
 
     @Override
     protected void masterOperation(Task task, UpdateDatafeedAction.Request request, ClusterState state,
                                    ActionListener<PutDatafeedAction.Response> listener) {
 
-        if (migrationEligibilityCheck.datafeedIsEligibleForMigration(request.getUpdate().getId(), state)) {
-            listener.onFailure(ExceptionsHelper.configHasNotBeenMigrated("update datafeed", request.getUpdate().getId()));
-            return;
-        }
-        // Check datafeed is stopped
-        PersistentTasksCustomMetadata tasks = state.getMetadata().custom(PersistentTasksCustomMetadata.TYPE);
-        if (MlTasks.getDatafeedTask(request.getUpdate().getId(), tasks) != null) {
-            listener.onFailure(ExceptionsHelper.conflictStatusException(
-                Messages.getMessage(Messages.DATAFEED_CANNOT_UPDATE_IN_CURRENT_STATE,
-                    request.getUpdate().getId(), DatafeedState.STARTED)));
-            return;
-        }
-
-        Runnable doUpdate = () ->
-            useSecondaryAuthIfAvailable(securityContext, () -> {
-                final Map<String, String> headers = threadPool.getThreadContext().getHeaders();
-                datafeedConfigProvider.updateDatefeedConfig(
-                    request.getUpdate().getId(),
-                    request.getUpdate(),
-                    headers,
-                    jobConfigProvider::validateDatafeedJob,
-                    ActionListener.wrap(
-                        updatedConfig -> listener.onResponse(new PutDatafeedAction.Response(updatedConfig)),
-                        listener::onFailure));
-            });
-
-        // Obviously if we're updating a datafeed it's impossible that the config index has no mappings at
-        // all, but if we rewrite the datafeed config we may add new fields that require the latest mappings
-        ElasticsearchMappings.addDocMappingIfMissing(
-            MlConfigIndex.indexName(), MlConfigIndex::mapping, client, state, request.masterNodeTimeout(),
-            ActionListener.wrap(bool -> doUpdate.run(), listener::onFailure));
+        datafeedManager.updateDatafeed(request, state, securityContext, threadPool, listener);
     }
 
     @Override

+ 402 - 0
x-pack/plugin/ml/src/main/java/org/elasticsearch/xpack/ml/datafeed/DatafeedManager.java

@@ -0,0 +1,402 @@
+/*
+ * Copyright Elasticsearch B.V. and/or licensed to Elasticsearch B.V. under one
+ * or more contributor license agreements. Licensed under the Elastic License
+ * 2.0; you may not use this file except in compliance with the Elastic License
+ * 2.0.
+ */
+
+package org.elasticsearch.xpack.ml.datafeed;
+
+import org.apache.logging.log4j.LogManager;
+import org.apache.logging.log4j.Logger;
+import org.elasticsearch.ElasticsearchException;
+import org.elasticsearch.action.ActionListener;
+import org.elasticsearch.action.search.SearchAction;
+import org.elasticsearch.action.support.master.AcknowledgedResponse;
+import org.elasticsearch.client.Client;
+import org.elasticsearch.cluster.ClusterState;
+import org.elasticsearch.cluster.service.ClusterService;
+import org.elasticsearch.common.Strings;
+import org.elasticsearch.common.settings.Settings;
+import org.elasticsearch.common.xcontent.NamedXContentRegistry;
+import org.elasticsearch.common.xcontent.XContentBuilder;
+import org.elasticsearch.common.xcontent.json.JsonXContent;
+import org.elasticsearch.core.CheckedConsumer;
+import org.elasticsearch.core.Nullable;
+import org.elasticsearch.index.IndexNotFoundException;
+import org.elasticsearch.license.RemoteClusterLicenseChecker;
+import org.elasticsearch.license.XPackLicenseState;
+import org.elasticsearch.persistent.PersistentTasksCustomMetadata;
+import org.elasticsearch.threadpool.ThreadPool;
+import org.elasticsearch.xpack.core.action.util.QueryPage;
+import org.elasticsearch.xpack.core.ml.MlConfigIndex;
+import org.elasticsearch.xpack.core.ml.MlMetadata;
+import org.elasticsearch.xpack.core.ml.MlTasks;
+import org.elasticsearch.xpack.core.ml.action.DeleteDatafeedAction;
+import org.elasticsearch.xpack.core.ml.action.GetDatafeedsAction;
+import org.elasticsearch.xpack.core.ml.action.PutDatafeedAction;
+import org.elasticsearch.xpack.core.ml.action.UpdateDatafeedAction;
+import org.elasticsearch.xpack.core.ml.datafeed.DatafeedConfig;
+import org.elasticsearch.xpack.core.ml.datafeed.DatafeedState;
+import org.elasticsearch.xpack.core.ml.job.messages.Messages;
+import org.elasticsearch.xpack.core.ml.job.persistence.ElasticsearchMappings;
+import org.elasticsearch.xpack.core.ml.utils.ExceptionsHelper;
+import org.elasticsearch.xpack.core.rollup.action.GetRollupIndexCapsAction;
+import org.elasticsearch.xpack.core.rollup.action.RollupSearchAction;
+import org.elasticsearch.xpack.core.security.SecurityContext;
+import org.elasticsearch.xpack.core.security.action.user.HasPrivilegesAction;
+import org.elasticsearch.xpack.core.security.action.user.HasPrivilegesRequest;
+import org.elasticsearch.xpack.core.security.action.user.HasPrivilegesResponse;
+import org.elasticsearch.xpack.core.security.authz.RoleDescriptor;
+import org.elasticsearch.xpack.core.security.authz.permission.ResourcePrivileges;
+import org.elasticsearch.xpack.core.security.support.Exceptions;
+import org.elasticsearch.xpack.ml.MlConfigMigrationEligibilityCheck;
+import org.elasticsearch.xpack.ml.datafeed.persistence.DatafeedConfigProvider;
+import org.elasticsearch.xpack.ml.job.persistence.JobConfigProvider;
+import org.elasticsearch.xpack.ml.job.persistence.JobDataDeleter;
+
+import java.io.IOException;
+import java.util.ArrayList;
+import java.util.Collections;
+import java.util.Comparator;
+import java.util.HashMap;
+import java.util.List;
+import java.util.Map;
+import java.util.Set;
+
+import static org.elasticsearch.xpack.core.ClientHelper.ML_ORIGIN;
+import static org.elasticsearch.xpack.core.ClientHelper.executeAsyncWithOrigin;
+import static org.elasticsearch.xpack.ml.utils.SecondaryAuthorizationUtils.useSecondaryAuthIfAvailable;
+
+/**
+ * Allows interactions with datafeeds. The managed interactions include:
+ * <ul>
+ * <li>creation</li>
+ * <li>reading</li>
+ * <li>deletion</li>
+ * <li>updating</li>
+ * </ul>
+ */
+public final class DatafeedManager {
+
+    private static final Logger logger = LogManager.getLogger(DatafeedManager.class);
+
+    private final DatafeedConfigProvider datafeedConfigProvider;
+    private final JobConfigProvider jobConfigProvider;
+    private final NamedXContentRegistry xContentRegistry;
+    private final ClusterService clusterService;
+    private final Client client;
+    private final MlConfigMigrationEligibilityCheck migrationEligibilityCheck;
+
+    public DatafeedManager(DatafeedConfigProvider datafeedConfigProvider,
+                           JobConfigProvider jobConfigProvider,
+                           NamedXContentRegistry xContentRegistry,
+                           ClusterService clusterService,
+                           Settings settings,
+                           Client client) {
+        this.datafeedConfigProvider = datafeedConfigProvider;
+        this.jobConfigProvider = jobConfigProvider;
+        this.xContentRegistry = xContentRegistry;
+        this.clusterService = clusterService;
+        this.migrationEligibilityCheck = new MlConfigMigrationEligibilityCheck(settings, clusterService);
+        this.client = client;
+    }
+
+    public void putDatafeed(
+        PutDatafeedAction.Request request,
+        ClusterState state,
+        XPackLicenseState licenseState,
+        SecurityContext securityContext,
+        ThreadPool threadPool,
+        ActionListener<PutDatafeedAction.Response> listener
+    ) {
+        if (licenseState.isSecurityEnabled()) {
+            useSecondaryAuthIfAvailable(securityContext, () -> {
+                final String[] indices = request.getDatafeed().getIndices().toArray(new String[0]);
+
+                final String username = securityContext.getUser().principal();
+                final HasPrivilegesRequest privRequest = new HasPrivilegesRequest();
+                privRequest.applicationPrivileges(new RoleDescriptor.ApplicationResourcePrivileges[0]);
+                privRequest.username(username);
+                privRequest.clusterPrivileges(Strings.EMPTY_ARRAY);
+
+                final RoleDescriptor.IndicesPrivileges.Builder indicesPrivilegesBuilder = RoleDescriptor.IndicesPrivileges.builder()
+                    .indices(indices);
+
+                ActionListener<HasPrivilegesResponse> privResponseListener = ActionListener.wrap(
+                    r -> handlePrivsResponse(username, request, r, state, threadPool, listener),
+                    listener::onFailure);
+
+                ActionListener<GetRollupIndexCapsAction.Response> getRollupIndexCapsActionHandler = ActionListener.wrap(
+                    response -> {
+                        if (response.getJobs().isEmpty()) { // This means no rollup indexes are in the config
+                            indicesPrivilegesBuilder.privileges(SearchAction.NAME);
+                        } else {
+                            indicesPrivilegesBuilder.privileges(SearchAction.NAME, RollupSearchAction.NAME);
+                        }
+                        privRequest.indexPrivileges(indicesPrivilegesBuilder.build());
+                        client.execute(HasPrivilegesAction.INSTANCE, privRequest, privResponseListener);
+                    },
+                    e -> {
+                        if (ExceptionsHelper.unwrapCause(e) instanceof IndexNotFoundException) {
+                            indicesPrivilegesBuilder.privileges(SearchAction.NAME);
+                            privRequest.indexPrivileges(indicesPrivilegesBuilder.build());
+                            client.execute(HasPrivilegesAction.INSTANCE, privRequest, privResponseListener);
+                        } else {
+                            listener.onFailure(e);
+                        }
+                    }
+                );
+                if (RemoteClusterLicenseChecker.containsRemoteIndex(request.getDatafeed().getIndices())) {
+                    getRollupIndexCapsActionHandler.onResponse(new GetRollupIndexCapsAction.Response());
+                } else {
+                    executeAsyncWithOrigin(client,
+                        ML_ORIGIN,
+                        GetRollupIndexCapsAction.INSTANCE,
+                        new GetRollupIndexCapsAction.Request(indices),
+                        getRollupIndexCapsActionHandler);
+                }
+            });
+        } else {
+            putDatafeed(request, threadPool.getThreadContext().getHeaders(), state, listener);
+        }
+    }
+
+    public void getDatafeeds(GetDatafeedsAction.Request request, ActionListener<QueryPage<DatafeedConfig>> listener) {
+        getDatafeeds(request, clusterService.state(), listener);
+    }
+
+    public void getDatafeeds(GetDatafeedsAction.Request request, ClusterState state, ActionListener<QueryPage<DatafeedConfig>> listener) {
+        Map<String, DatafeedConfig> clusterStateConfigs = expandClusterStateDatafeeds(
+            request.getDatafeedId(),
+            request.allowNoMatch(),
+            state
+        );
+
+        datafeedConfigProvider.expandDatafeedConfigs(request.getDatafeedId(), request.allowNoMatch(), ActionListener.wrap(
+            datafeedBuilders -> {
+                // Check for duplicate datafeeds
+                for (DatafeedConfig.Builder datafeed : datafeedBuilders) {
+                    if (clusterStateConfigs.containsKey(datafeed.getId())) {
+                        listener.onFailure(new IllegalStateException("Datafeed [" + datafeed.getId() + "] configuration " +
+                            "exists in both clusterstate and index"));
+                        return;
+                    }
+                }
+
+                // Merge cluster state and index configs
+                List<DatafeedConfig> datafeeds = new ArrayList<>(datafeedBuilders.size() + clusterStateConfigs.values().size());
+                for (DatafeedConfig.Builder builder: datafeedBuilders) {
+                    datafeeds.add(builder.build());
+                }
+
+                datafeeds.addAll(clusterStateConfigs.values());
+                Collections.sort(datafeeds, Comparator.comparing(DatafeedConfig::getId));
+                listener.onResponse(new QueryPage<>(datafeeds, datafeeds.size(), DatafeedConfig.RESULTS_FIELD));
+            },
+            listener::onFailure
+        ));
+    }
+
+    public void updateDatafeed(
+        UpdateDatafeedAction.Request request,
+        ClusterState state,
+        SecurityContext securityContext,
+        ThreadPool threadPool,
+        ActionListener<PutDatafeedAction.Response> listener
+    ) {
+        if (migrationEligibilityCheck.datafeedIsEligibleForMigration(request.getUpdate().getId(), state)) {
+            listener.onFailure(ExceptionsHelper.configHasNotBeenMigrated("update datafeed", request.getUpdate().getId()));
+            return;
+        }
+        // Check datafeed is stopped
+        if (getDatafeedTask(state, request.getUpdate().getId()) != null) {
+            listener.onFailure(ExceptionsHelper.conflictStatusException(
+                Messages.getMessage(Messages.DATAFEED_CANNOT_UPDATE_IN_CURRENT_STATE,
+                    request.getUpdate().getId(), DatafeedState.STARTED)));
+            return;
+        }
+
+        Runnable doUpdate = () ->
+            useSecondaryAuthIfAvailable(securityContext, () -> {
+                final Map<String, String> headers = threadPool.getThreadContext().getHeaders();
+                datafeedConfigProvider.updateDatefeedConfig(
+                    request.getUpdate().getId(),
+                    request.getUpdate(),
+                    headers,
+                    jobConfigProvider::validateDatafeedJob,
+                    ActionListener.wrap(
+                        updatedConfig -> listener.onResponse(new PutDatafeedAction.Response(updatedConfig)),
+                        listener::onFailure));
+            });
+
+        // Obviously if we're updating a datafeed it's impossible that the config index has no mappings at
+        // all, but if we rewrite the datafeed config we may add new fields that require the latest mappings
+        ElasticsearchMappings.addDocMappingIfMissing(
+            MlConfigIndex.indexName(), MlConfigIndex::mapping, client, state, request.masterNodeTimeout(),
+            ActionListener.wrap(bool -> doUpdate.run(), listener::onFailure));
+    }
+
+    public void deleteDatafeed(
+        DeleteDatafeedAction.Request request,
+        ClusterState state,
+        ActionListener<AcknowledgedResponse> listener
+    ) {
+        if (getDatafeedTask(state, request.getDatafeedId()) != null) {
+            listener.onFailure(ExceptionsHelper.conflictStatusException(
+                Messages.getMessage(Messages.DATAFEED_CANNOT_DELETE_IN_CURRENT_STATE, request.getDatafeedId(), DatafeedState.STARTED)));
+            return;
+        }
+
+        String datafeedId = request.getDatafeedId();
+
+        datafeedConfigProvider.getDatafeedConfig(
+            datafeedId,
+            ActionListener.wrap(
+                datafeedConfigBuilder -> {
+                    String jobId = datafeedConfigBuilder.build().getJobId();
+                    JobDataDeleter jobDataDeleter = new JobDataDeleter(client, jobId);
+                    jobDataDeleter.deleteDatafeedTimingStats(
+                        ActionListener.wrap(
+                            unused1 -> {
+                                datafeedConfigProvider.deleteDatafeedConfig(
+                                    datafeedId,
+                                    ActionListener.wrap(
+                                        unused2 -> listener.onResponse(AcknowledgedResponse.TRUE),
+                                        listener::onFailure));
+                            },
+                            listener::onFailure));
+                },
+                listener::onFailure));
+
+    }
+
+    private  PersistentTasksCustomMetadata.PersistentTask<?> getDatafeedTask(ClusterState state, String datafeedId) {
+        PersistentTasksCustomMetadata tasks = state.getMetadata().custom(PersistentTasksCustomMetadata.TYPE);
+        return MlTasks.getDatafeedTask(datafeedId, tasks);
+    }
+
+    private Map<String, DatafeedConfig> expandClusterStateDatafeeds(
+        String datafeedExpression,
+        boolean allowNoMatch,
+        ClusterState clusterState
+    ) {
+
+        Map<String, DatafeedConfig> configById = new HashMap<>();
+        try {
+            MlMetadata mlMetadata = MlMetadata.getMlMetadata(clusterState);
+            Set<String> expandedDatafeedIds = mlMetadata.expandDatafeedIds(datafeedExpression, allowNoMatch);
+
+            for (String expandedDatafeedId : expandedDatafeedIds) {
+                configById.put(expandedDatafeedId, mlMetadata.getDatafeed(expandedDatafeedId));
+            }
+        } catch (Exception e){
+            // ignore
+        }
+
+        return configById;
+    }
+
+    private void handlePrivsResponse(String username,
+                                     PutDatafeedAction.Request request,
+                                     HasPrivilegesResponse response,
+                                     ClusterState clusterState,
+                                     ThreadPool threadPool,
+                                     ActionListener<PutDatafeedAction.Response> listener) throws IOException {
+        if (response.isCompleteMatch()) {
+            putDatafeed(request, threadPool.getThreadContext().getHeaders(), clusterState, listener);
+        } else {
+            XContentBuilder builder = JsonXContent.contentBuilder();
+            builder.startObject();
+            for (ResourcePrivileges index : response.getIndexPrivileges()) {
+                builder.field(index.getResource());
+                builder.map(index.getPrivileges());
+            }
+            builder.endObject();
+
+            listener.onFailure(Exceptions.authorizationError("Cannot create datafeed [{}]" +
+                    " because user {} lacks permissions on the indices: {}",
+                request.getDatafeed().getId(), username, Strings.toString(builder)));
+        }
+    }
+
+    private void putDatafeed(PutDatafeedAction.Request request,
+                             Map<String, String> headers,
+                             ClusterState clusterState,
+                             ActionListener<PutDatafeedAction.Response> listener) {
+
+        String datafeedId = request.getDatafeed().getId();
+        String jobId = request.getDatafeed().getJobId();
+        ElasticsearchException validationError = checkConfigsAreNotDefinedInClusterState(datafeedId, jobId);
+        if (validationError != null) {
+            listener.onFailure(validationError);
+            return;
+        }
+        DatafeedConfig.validateAggregations(request.getDatafeed().getParsedAggregations(xContentRegistry));
+
+        CheckedConsumer<Boolean, Exception> mappingsUpdated = ok -> {
+            datafeedConfigProvider.putDatafeedConfig(
+                request.getDatafeed(),
+                headers,
+                ActionListener.wrap(
+                    indexResponse -> listener.onResponse(new PutDatafeedAction.Response(request.getDatafeed())),
+                    listener::onFailure
+                ));
+        };
+
+        CheckedConsumer<Boolean, Exception> validationOk = ok -> {
+            if (clusterState == null) {
+                logger.warn("Cannot update doc mapping because clusterState == null");
+                mappingsUpdated.accept(false);
+                return;
+            }
+            ElasticsearchMappings.addDocMappingIfMissing(
+                MlConfigIndex.indexName(),
+                MlConfigIndex::mapping,
+                client,
+                clusterState,
+                request.masterNodeTimeout(),
+                ActionListener.wrap(mappingsUpdated, listener::onFailure));
+        };
+
+        CheckedConsumer<Boolean, Exception> jobOk = ok ->
+            jobConfigProvider.validateDatafeedJob(request.getDatafeed(), ActionListener.wrap(validationOk, listener::onFailure));
+
+        checkJobDoesNotHaveADatafeed(jobId, ActionListener.wrap(jobOk, listener::onFailure));
+    }
+
+    /**
+     * Returns an exception if a datafeed with the same Id is defined in the
+     * cluster state or the job is in the cluster state and already has a datafeed
+     */
+    @Nullable
+    private ElasticsearchException checkConfigsAreNotDefinedInClusterState(String datafeedId, String jobId) {
+        ClusterState clusterState = clusterService.state();
+        MlMetadata mlMetadata = MlMetadata.getMlMetadata(clusterState);
+
+        if (mlMetadata.getDatafeed(datafeedId) != null) {
+            return ExceptionsHelper.datafeedAlreadyExists(datafeedId);
+        }
+
+        if (mlMetadata.getDatafeedByJobId(jobId).isPresent()) {
+            return ExceptionsHelper.conflictStatusException("Cannot create datafeed [" + datafeedId + "] as a " +
+                "job [" + jobId + "] defined in the cluster state references a datafeed with the same Id");
+        }
+
+        return null;
+    }
+
+    private void checkJobDoesNotHaveADatafeed(String jobId, ActionListener<Boolean> listener) {
+        datafeedConfigProvider.findDatafeedsForJobIds(Collections.singletonList(jobId), ActionListener.wrap(
+            datafeedIds -> {
+                if (datafeedIds.isEmpty()) {
+                    listener.onResponse(Boolean.TRUE);
+                } else {
+                    listener.onFailure(ExceptionsHelper.conflictStatusException("A datafeed [" + datafeedIds.iterator().next()
+                        + "] already exists for job [" + jobId + "]"));
+                }
+            },
+            listener::onFailure
+        ));
+    }
+}

+ 56 - 1
x-pack/plugin/ml/src/main/java/org/elasticsearch/xpack/ml/job/JobManager.java

@@ -14,8 +14,10 @@ import org.elasticsearch.Version;
 import org.elasticsearch.action.ActionListener;
 import org.elasticsearch.action.index.IndexResponse;
 import org.elasticsearch.action.support.WriteRequest;
+import org.elasticsearch.action.support.master.AcknowledgedResponse;
 import org.elasticsearch.client.Client;
 import org.elasticsearch.cluster.ClusterState;
+import org.elasticsearch.cluster.metadata.IndexNameExpressionResolver;
 import org.elasticsearch.cluster.metadata.Metadata;
 import org.elasticsearch.cluster.service.ClusterService;
 import org.elasticsearch.common.Strings;
@@ -37,6 +39,7 @@ import org.elasticsearch.xpack.core.ml.MachineLearningField;
 import org.elasticsearch.xpack.core.ml.MlConfigIndex;
 import org.elasticsearch.xpack.core.ml.MlMetadata;
 import org.elasticsearch.xpack.core.ml.MlTasks;
+import org.elasticsearch.xpack.core.ml.action.DeleteJobAction;
 import org.elasticsearch.xpack.core.ml.action.PutJobAction;
 import org.elasticsearch.xpack.core.ml.action.RevertModelSnapshotAction;
 import org.elasticsearch.xpack.core.ml.action.UpdateJobAction;
@@ -58,6 +61,7 @@ import org.elasticsearch.xpack.ml.MachineLearning;
 import org.elasticsearch.xpack.ml.MlConfigMigrationEligibilityCheck;
 import org.elasticsearch.xpack.ml.job.categorization.CategorizationAnalyzer;
 import org.elasticsearch.xpack.ml.job.persistence.JobConfigProvider;
+import org.elasticsearch.xpack.ml.job.persistence.JobDataDeleter;
 import org.elasticsearch.xpack.ml.job.persistence.JobResultsPersister;
 import org.elasticsearch.xpack.ml.job.persistence.JobResultsProvider;
 import org.elasticsearch.xpack.ml.job.process.autodetect.UpdateParams;
@@ -102,6 +106,7 @@ public class JobManager {
     private final UpdateJobProcessNotifier updateJobProcessNotifier;
     private final JobConfigProvider jobConfigProvider;
     private final MlConfigMigrationEligibilityCheck migrationEligibilityCheck;
+    private final IndexNameExpressionResolver indexNameExpressionResolver;
 
     private volatile ByteSizeValue maxModelMemoryLimit;
 
@@ -111,7 +116,7 @@ public class JobManager {
     public JobManager(Environment environment, Settings settings, JobResultsProvider jobResultsProvider,
                       JobResultsPersister jobResultsPersister, ClusterService clusterService, AnomalyDetectionAuditor auditor,
                       ThreadPool threadPool, Client client, UpdateJobProcessNotifier updateJobProcessNotifier,
-                      NamedXContentRegistry xContentRegistry) {
+                      NamedXContentRegistry xContentRegistry, IndexNameExpressionResolver indexNameExpressionResolver) {
         this.jobResultsProvider = Objects.requireNonNull(jobResultsProvider);
         this.jobResultsPersister = Objects.requireNonNull(jobResultsPersister);
         this.clusterService = Objects.requireNonNull(clusterService);
@@ -121,6 +126,7 @@ public class JobManager {
         this.updateJobProcessNotifier = updateJobProcessNotifier;
         this.jobConfigProvider = new JobConfigProvider(client, xContentRegistry);
         this.migrationEligibilityCheck = new MlConfigMigrationEligibilityCheck(settings, clusterService);
+        this.indexNameExpressionResolver = indexNameExpressionResolver;
 
         maxModelMemoryLimit = MachineLearningField.MAX_MODEL_MEMORY_LIMIT.get(settings);
         clusterService.getClusterSettings()
@@ -410,6 +416,55 @@ public class JobManager {
         }
     }
 
+    public void deleteJob(DeleteJobAction.Request request, ClusterState state, ActionListener<AcknowledgedResponse> listener) {
+        deleteJob(request, client, state, listener);
+    }
+
+    public void deleteJob(
+        DeleteJobAction.Request request,
+        Client client,
+        ClusterState state,
+        ActionListener<AcknowledgedResponse> listener
+    ) {
+        final String jobId = request.getJobId();
+
+        // Step 4. When the job has been removed from the cluster state, return a response
+        // -------
+        CheckedConsumer<Boolean, Exception> apiResponseHandler = jobDeleted -> {
+            if (jobDeleted) {
+                logger.info("Job [" + jobId + "] deleted");
+                auditor.info(jobId, Messages.getMessage(Messages.JOB_AUDIT_DELETED));
+                listener.onResponse(AcknowledgedResponse.TRUE);
+            } else {
+                listener.onResponse(AcknowledgedResponse.FALSE);
+            }
+        };
+
+        // Step 3. When the physical storage has been deleted, delete the job config document
+        // -------
+        // Don't report an error if the document has already been deleted
+        CheckedConsumer<Boolean, Exception> deleteJobStateHandler = response -> jobConfigProvider.deleteJob(jobId, false,
+            ActionListener.wrap(
+                deleteResponse -> apiResponseHandler.accept(Boolean.TRUE),
+                listener::onFailure
+            )
+        );
+
+        // Step 2. Remove the job from any calendars
+        CheckedConsumer<Boolean, Exception> removeFromCalendarsHandler = response -> jobResultsProvider.removeJobFromCalendars(jobId,
+            ActionListener.wrap(deleteJobStateHandler, listener::onFailure));
+
+
+        // Step 1. Delete the physical storage
+        new JobDataDeleter(client, jobId).deleteJobDocuments(
+            jobConfigProvider,
+            indexNameExpressionResolver,
+            state,
+            removeFromCalendarsHandler,
+            listener::onFailure
+        );
+    }
+
     private void postJobUpdate(UpdateJobAction.Request request, Job updatedJob, ActionListener<PutJobAction.Response> actionListener) {
         // Autodetect must be updated if the fields that the C++ uses are changed
         if (request.getJobUpdate().isAutodetectProcessUpdate()) {

+ 4 - 1
x-pack/plugin/ml/src/test/java/org/elasticsearch/xpack/ml/job/JobManagerTests.java

@@ -38,6 +38,7 @@ import org.elasticsearch.env.TestEnvironment;
 import org.elasticsearch.index.Index;
 import org.elasticsearch.index.analysis.AnalysisRegistry;
 import org.elasticsearch.index.shard.ShardId;
+import org.elasticsearch.indices.TestIndexNameExpressionResolver;
 import org.elasticsearch.persistent.PersistentTasksCustomMetadata;
 import org.elasticsearch.search.SearchModule;
 import org.elasticsearch.test.ESTestCase;
@@ -667,7 +668,9 @@ public class JobManagerTests extends ESTestCase {
             threadPool,
             client,
             updateJobProcessNotifier,
-            xContentRegistry());
+            xContentRegistry(),
+            TestIndexNameExpressionResolver.newInstance()
+        );
     }
 
     private ClusterState createClusterState() {