Browse Source

[ML] Throw an error when a datafeed needs CCS but it is not enabled for the node (#46044)

Though we allow CCS within datafeeds, users could prevent nodes from accessing remote clusters. This can cause mysterious errors and difficult to troubleshoot.

This commit adds a check to verify that `cluster.remote.connect` is enabled on the current node when a datafeed is configured with a remote index pattern.
Benjamin Trent 6 years ago
parent
commit
9f716fbd4c

+ 5 - 0
docs/reference/ml/anomaly-detection/apis/put-datafeed.asciidoc

@@ -72,6 +72,11 @@ those same roles.
 `indices`::
   (Required, array) An array of index names. Wildcards are supported. For
   example: `["it_ops_metrics", "server*"]`.
++
+--
+NOTE: If any indices are in remote clusters then `cluster.remote.connect` must
+not be set to `false` on any ML node.
+--
 
 `job_id`::
  (Required, string) A numerical character string that uniquely identifies the

+ 3 - 0
x-pack/plugin/core/src/main/java/org/elasticsearch/xpack/core/ml/job/messages/Messages.java

@@ -49,6 +49,9 @@ public final class Messages {
     public static final String DATAFEED_FREQUENCY_MUST_BE_MULTIPLE_OF_AGGREGATIONS_INTERVAL =
             "Datafeed frequency [{0}] must be a multiple of the aggregation interval [{1}]";
     public static final String DATAFEED_ID_ALREADY_TAKEN = "A datafeed with id [{0}] already exists";
+    public static final String DATAFEED_NEEDS_REMOTE_CLUSTER_SEARCH = "Datafeed [{0}] is configured with a remote index pattern(s) {1}" +
+        " but the current node [{2}] is not allowed to connect to remote clusters." +
+        " Please enable cluster.remote.connect for all machine learning nodes.";
 
     public static final String DATA_FRAME_ANALYTICS_BAD_QUERY_FORMAT = "Data Frame Analytics config query is not parsable";
     public static final String DATA_FRAME_ANALYTICS_BAD_FIELD_FILTER = "No field [{0}] could be detected";

+ 3 - 1
x-pack/plugin/ml/src/main/java/org/elasticsearch/xpack/ml/MachineLearning.java

@@ -546,7 +546,9 @@ public class MachineLearning extends Plugin implements ActionPlugin, AnalysisPlu
                 jobConfigProvider,
                 jobResultsProvider,
                 datafeedConfigProvider,
-                jobResultsPersister);
+                jobResultsPersister,
+                settings,
+                clusterService.getNodeName());
         DatafeedManager datafeedManager = new DatafeedManager(threadPool, client, clusterService, datafeedJobBuilder,
                 System::currentTimeMillis, auditor, autodetectProcessManager);
         this.datafeedManager.set(datafeedManager);

+ 13 - 2
x-pack/plugin/ml/src/main/java/org/elasticsearch/xpack/ml/action/TransportStartDatafeedAction.java

@@ -37,6 +37,7 @@ import org.elasticsearch.rest.RestStatus;
 import org.elasticsearch.tasks.Task;
 import org.elasticsearch.tasks.TaskId;
 import org.elasticsearch.threadpool.ThreadPool;
+import org.elasticsearch.transport.RemoteClusterService;
 import org.elasticsearch.transport.TransportService;
 import org.elasticsearch.xpack.core.XPackField;
 import org.elasticsearch.xpack.core.ml.MlTasks;
@@ -47,6 +48,7 @@ import org.elasticsearch.xpack.core.ml.datafeed.DatafeedState;
 import org.elasticsearch.xpack.core.ml.datafeed.DatafeedTimingStats;
 import org.elasticsearch.xpack.core.ml.job.config.Job;
 import org.elasticsearch.xpack.core.ml.job.config.JobState;
+import org.elasticsearch.xpack.core.ml.job.messages.Messages;
 import org.elasticsearch.xpack.core.ml.utils.ExceptionsHelper;
 import org.elasticsearch.xpack.ml.MachineLearning;
 import org.elasticsearch.xpack.ml.MlConfigMigrationEligibilityCheck;
@@ -85,6 +87,7 @@ public class TransportStartDatafeedAction extends TransportMasterNodeAction<Star
     private final AnomalyDetectionAuditor auditor;
     private final MlConfigMigrationEligibilityCheck migrationEligibilityCheck;
     private final NamedXContentRegistry xContentRegistry;
+    private final boolean remoteClusterSearchSupported;
 
     @Inject
     public TransportStartDatafeedAction(Settings settings, TransportService transportService, ThreadPool threadPool,
@@ -103,6 +106,7 @@ public class TransportStartDatafeedAction extends TransportMasterNodeAction<Star
         this.auditor = auditor;
         this.migrationEligibilityCheck = new MlConfigMigrationEligibilityCheck(settings, clusterService);
         this.xContentRegistry = xContentRegistry;
+        this.remoteClusterSearchSupported = RemoteClusterService.ENABLE_REMOTE_CLUSTERS.get(settings);
     }
 
     static void validate(Job job,
@@ -181,7 +185,7 @@ public class TransportStartDatafeedAction extends TransportMasterNodeAction<Star
                 };
 
         // Verify data extractor factory can be created, then start persistent task
-        Consumer<Job> createDataExtrator = job -> {
+        Consumer<Job> createDataExtractor = job -> {
                 if (RemoteClusterLicenseChecker.containsRemoteIndex(params.getDatafeedIndices())) {
                     final RemoteClusterLicenseChecker remoteClusterLicenseChecker =
                             new RemoteClusterLicenseChecker(client, XPackLicenseState::isMachineLearningAllowedForOperationMode);
@@ -193,6 +197,13 @@ public class TransportStartDatafeedAction extends TransportMasterNodeAction<Star
                                     response -> {
                                         if (response.isSuccess() == false) {
                                             listener.onFailure(createUnlicensedError(params.getDatafeedId(), response));
+                                        } else if (remoteClusterSearchSupported == false) {
+                                            listener.onFailure(
+                                                ExceptionsHelper.badRequestException(Messages.getMessage(
+                                                    Messages.DATAFEED_NEEDS_REMOTE_CLUSTER_SEARCH,
+                                                    datafeedConfigHolder.get().getId(),
+                                                    RemoteClusterLicenseChecker.remoteIndices(datafeedConfigHolder.get().getIndices()),
+                                                    clusterService.getNodeName())));
                                         } else {
                                             createDataExtractor(job, datafeedConfigHolder.get(), params, waitForTaskListener);
                                         }
@@ -214,7 +225,7 @@ public class TransportStartDatafeedAction extends TransportMasterNodeAction<Star
                         Job job = jobBuilder.build();
                         validate(job, datafeedConfigHolder.get(), tasks, xContentRegistry);
                         auditDeprecations(datafeedConfigHolder.get(), job, auditor, xContentRegistry);
-                        createDataExtrator.accept(job);
+                        createDataExtractor.accept(job);
                     } catch (Exception e) {
                         listener.onFailure(e);
                     }

+ 23 - 1
x-pack/plugin/ml/src/main/java/org/elasticsearch/xpack/ml/datafeed/DatafeedJobBuilder.java

@@ -8,17 +8,22 @@ package org.elasticsearch.xpack.ml.datafeed;
 import org.elasticsearch.ResourceNotFoundException;
 import org.elasticsearch.action.ActionListener;
 import org.elasticsearch.client.Client;
+import org.elasticsearch.common.settings.Settings;
 import org.elasticsearch.common.unit.TimeValue;
 import org.elasticsearch.common.xcontent.NamedXContentRegistry;
+import org.elasticsearch.license.RemoteClusterLicenseChecker;
+import org.elasticsearch.transport.RemoteClusterService;
 import org.elasticsearch.xpack.core.action.util.QueryPage;
 import org.elasticsearch.xpack.core.ml.datafeed.DatafeedConfig;
 import org.elasticsearch.xpack.core.ml.datafeed.DatafeedJobValidator;
 import org.elasticsearch.xpack.core.ml.datafeed.DatafeedTimingStats;
 import org.elasticsearch.xpack.core.ml.job.config.DataDescription;
 import org.elasticsearch.xpack.core.ml.job.config.Job;
+import org.elasticsearch.xpack.core.ml.job.messages.Messages;
 import org.elasticsearch.xpack.core.ml.job.process.autodetect.state.DataCounts;
 import org.elasticsearch.xpack.core.ml.job.results.Bucket;
 import org.elasticsearch.xpack.core.ml.job.results.Result;
+import org.elasticsearch.xpack.core.ml.utils.ExceptionsHelper;
 import org.elasticsearch.xpack.ml.datafeed.delayeddatacheck.DelayedDataDetector;
 import org.elasticsearch.xpack.ml.datafeed.delayeddatacheck.DelayedDataDetectorFactory;
 import org.elasticsearch.xpack.ml.datafeed.extractor.DataExtractorFactory;
@@ -30,6 +35,7 @@ import org.elasticsearch.xpack.ml.job.persistence.JobResultsProvider;
 import org.elasticsearch.xpack.ml.notifications.AnomalyDetectionAuditor;
 
 import java.util.Collections;
+import java.util.List;
 import java.util.Objects;
 import java.util.concurrent.atomic.AtomicReference;
 import java.util.function.Consumer;
@@ -45,11 +51,13 @@ public class DatafeedJobBuilder {
     private final JobResultsProvider jobResultsProvider;
     private final DatafeedConfigProvider datafeedConfigProvider;
     private final JobResultsPersister jobResultsPersister;
+    private final boolean remoteClusterSearchSupported;
+    private final String nodeName;
 
     public DatafeedJobBuilder(Client client, NamedXContentRegistry xContentRegistry, AnomalyDetectionAuditor auditor,
                               Supplier<Long> currentTimeSupplier, JobConfigProvider jobConfigProvider,
                               JobResultsProvider jobResultsProvider, DatafeedConfigProvider datafeedConfigProvider,
-                              JobResultsPersister jobResultsPersister) {
+                              JobResultsPersister jobResultsPersister, Settings settings, String nodeName) {
         this.client = client;
         this.xContentRegistry = Objects.requireNonNull(xContentRegistry);
         this.auditor = Objects.requireNonNull(auditor);
@@ -58,6 +66,8 @@ public class DatafeedJobBuilder {
         this.jobResultsProvider = Objects.requireNonNull(jobResultsProvider);
         this.datafeedConfigProvider = Objects.requireNonNull(datafeedConfigProvider);
         this.jobResultsPersister = Objects.requireNonNull(jobResultsPersister);
+        this.remoteClusterSearchSupported = RemoteClusterService.ENABLE_REMOTE_CLUSTERS.get(settings);
+        this.nodeName = nodeName;
     }
 
     void build(String datafeedId, ActionListener<DatafeedJob> listener) {
@@ -168,6 +178,18 @@ public class DatafeedJobBuilder {
                 configBuilder -> {
                     try {
                         datafeedConfigHolder.set(configBuilder.build());
+                        if (remoteClusterSearchSupported == false) {
+                            List<String> remoteIndices = RemoteClusterLicenseChecker.remoteIndices(datafeedConfigHolder.get().getIndices());
+                            if (remoteIndices.isEmpty() == false) {
+                                listener.onFailure(
+                                    ExceptionsHelper.badRequestException(Messages.getMessage(
+                                        Messages.DATAFEED_NEEDS_REMOTE_CLUSTER_SEARCH,
+                                        configBuilder.getId(),
+                                        remoteIndices,
+                                        nodeName)));
+                                return;
+                            }
+                        }
                         jobConfigProvider.getJob(datafeedConfigHolder.get().getJobId(), jobConfigListener);
                     } catch (Exception e) {
                         listener.onFailure(e);

+ 45 - 1
x-pack/plugin/ml/src/test/java/org/elasticsearch/xpack/ml/datafeed/DatafeedJobBuilderTests.java

@@ -13,10 +13,12 @@ import org.elasticsearch.common.util.concurrent.ThreadContext;
 import org.elasticsearch.mock.orig.Mockito;
 import org.elasticsearch.test.ESTestCase;
 import org.elasticsearch.threadpool.ThreadPool;
+import org.elasticsearch.transport.RemoteClusterService;
 import org.elasticsearch.xpack.core.action.util.QueryPage;
 import org.elasticsearch.xpack.core.ml.datafeed.DatafeedConfig;
 import org.elasticsearch.xpack.core.ml.job.config.DataDescription;
 import org.elasticsearch.xpack.core.ml.job.config.Job;
+import org.elasticsearch.xpack.core.ml.job.messages.Messages;
 import org.elasticsearch.xpack.core.ml.job.process.autodetect.state.DataCounts;
 import org.elasticsearch.xpack.core.ml.job.results.Bucket;
 import org.elasticsearch.xpack.ml.datafeed.persistence.DatafeedConfigProvider;
@@ -92,7 +94,9 @@ public class DatafeedJobBuilderTests extends ESTestCase {
                 jobConfigProvider,
                 jobResultsProvider,
                 datafeedConfigProvider,
-                jobResultsPersister);
+                jobResultsPersister,
+                Settings.EMPTY,
+                "test_node");
     }
 
     public void testBuild_GivenScrollDatafeedAndNewJob() throws Exception {
@@ -202,6 +206,46 @@ public class DatafeedJobBuilderTests extends ESTestCase {
         verify(taskHandler).accept(error);
     }
 
+    public void testBuildGivenRemoteIndicesButNoRemoteSearching() throws Exception {
+        Settings settings = Settings.builder().put(RemoteClusterService.ENABLE_REMOTE_CLUSTERS.getKey(), false).build();
+        datafeedJobBuilder =
+            new DatafeedJobBuilder(
+                client,
+                xContentRegistry(),
+                auditor,
+                System::currentTimeMillis,
+                jobConfigProvider,
+                jobResultsProvider,
+                datafeedConfigProvider,
+                jobResultsPersister,
+                settings,
+                "test_node");
+        DataDescription.Builder dataDescription = new DataDescription.Builder();
+        dataDescription.setTimeField("time");
+        Job.Builder jobBuilder = DatafeedManagerTests.createDatafeedJob();
+        jobBuilder.setDataDescription(dataDescription);
+        jobBuilder.setCreateTime(new Date());
+        DatafeedConfig.Builder datafeed = DatafeedManagerTests.createDatafeedConfig("datafeed1", jobBuilder.getId());
+        datafeed.setIndices(Collections.singletonList("remotecluster:index-*"));
+
+        AtomicBoolean wasHandlerCalled = new AtomicBoolean(false);
+        ActionListener<DatafeedJob> datafeedJobHandler = ActionListener.wrap(
+            datafeedJob -> fail("datafeed builder did not fail when remote index was given and remote clusters were not enabled"),
+            e -> {
+                assertThat(e.getMessage(), equalTo(Messages.getMessage(Messages.DATAFEED_NEEDS_REMOTE_CLUSTER_SEARCH,
+                    "datafeed1",
+                    "[remotecluster:index-*]",
+                    "test_node")));
+                wasHandlerCalled.compareAndSet(false, true);
+            }
+        );
+
+        givenJob(jobBuilder);
+        givenDatafeed(datafeed);
+        datafeedJobBuilder.build("datafeed1", datafeedJobHandler);
+        assertBusy(() -> wasHandlerCalled.get());
+    }
+
     private void givenJob(Job.Builder job) {
         Mockito.doAnswer(invocationOnMock -> {
             @SuppressWarnings("unchecked")