Sfoglia il codice sorgente

[ML] fix data frame analytics when there are no ML nodes but lazy node allocation is allowed (#67840)

We cannot calculate memory size estimates if there are no ML nodes.

But, if lazy nodes are enabled (or lazy starting in the analytics config), we should still be able to start the job.

In _explain if there are no ML nodes, but there are lazy nodes (or the data frame analytics config allows lazy opening), we simply skip the memory estimate (returning the default of 1gb)
Benjamin Trent 4 anni fa
parent
commit
8a0aad2683

+ 2 - 0
x-pack/plugin/core/src/main/java/org/elasticsearch/xpack/core/ml/job/messages/Messages.java

@@ -69,6 +69,8 @@ public final class Messages {
     public static final String DATA_FRAME_ANALYTICS_AUDIT_ESTIMATED_MEMORY_USAGE_HIGHER_THAN_CONFIGURED =
         "Configured model memory limit [{0}] is lower than the expected memory usage [{1}]. " +
             "The analytics job may fail due to configured memory constraints.";
+    public static final String DATA_FRAME_ANALYTICS_AUDIT_UNABLE_TO_ESTIMATE_MEMORY_USAGE =
+        "Data frame analytics is unable to provide an accurate estimate. Unable to determine if configured memory [{0}] is adequate.";
     public static final String DATA_FRAME_ANALYTICS_AUDIT_CREATING_DEST_INDEX = "Creating destination index [{0}]";
     public static final String DATA_FRAME_ANALYTICS_AUDIT_REUSING_DEST_INDEX = "Using existing destination index [{0}]";
     public static final String DATA_FRAME_ANALYTICS_AUDIT_STARTED_REINDEXING = "Started reindexing to destination index [{0}]";

+ 0 - 1
x-pack/plugin/ml/qa/native-multi-node-tests/src/javaRestTest/java/org/elasticsearch/xpack/ml/integration/ExplainDataFrameAnalyticsRestIT.java

@@ -22,7 +22,6 @@ import java.util.List;
 import java.util.stream.Collectors;
 
 import static org.hamcrest.Matchers.containsString;
-import static org.hamcrest.Matchers.equalTo;
 import static org.hamcrest.Matchers.not;
 
 public class ExplainDataFrameAnalyticsRestIT extends ESRestTestCase {

+ 226 - 0
x-pack/plugin/ml/src/internalClusterTest/java/org/elasticsearch/xpack/ml/integration/DataFrameAnalyticsLazyStartIT.java

@@ -0,0 +1,226 @@
+/*
+ * Copyright Elasticsearch B.V. and/or licensed to Elasticsearch B.V. under one
+ * or more contributor license agreements. Licensed under the Elastic License;
+ * you may not use this file except in compliance with the Elastic License.
+ */
+package org.elasticsearch.xpack.ml.integration;
+
+import java.util.Collection;
+import java.util.Set;
+
+import org.elasticsearch.action.support.WriteRequest;
+import org.elasticsearch.cluster.node.DiscoveryNodeRole;
+import org.elasticsearch.common.settings.Settings;
+import org.elasticsearch.common.unit.TimeValue;
+import org.elasticsearch.common.util.CollectionUtils;
+import org.elasticsearch.common.xcontent.XContentType;
+import org.elasticsearch.plugins.Plugin;
+import org.elasticsearch.test.transport.MockTransportService;
+import org.elasticsearch.xpack.core.ml.action.ExplainDataFrameAnalyticsAction;
+import org.elasticsearch.xpack.core.ml.action.GetDataFrameAnalyticsStatsAction;
+import org.elasticsearch.xpack.core.ml.action.PutDataFrameAnalyticsAction;
+import org.elasticsearch.xpack.core.ml.action.StartDataFrameAnalyticsAction;
+import org.elasticsearch.xpack.core.ml.action.StopDataFrameAnalyticsAction;
+import org.elasticsearch.xpack.core.ml.dataframe.DataFrameAnalyticsConfig;
+import org.elasticsearch.xpack.core.ml.dataframe.DataFrameAnalyticsDest;
+import org.elasticsearch.xpack.core.ml.dataframe.DataFrameAnalyticsSource;
+import org.elasticsearch.xpack.core.ml.dataframe.DataFrameAnalyticsState;
+import org.elasticsearch.xpack.core.ml.dataframe.analyses.OutlierDetection;
+import org.elasticsearch.xpack.ml.MachineLearning;
+import org.elasticsearch.xpack.ml.support.BaseMlIntegTestCase;
+import org.junit.Before;
+
+import static org.elasticsearch.test.NodeRoles.onlyRoles;
+import static org.elasticsearch.xpack.core.ml.dataframe.DataFrameAnalyticsConfig.DEFAULT_MODEL_MEMORY_LIMIT;
+import static org.hamcrest.Matchers.containsString;
+import static org.hamcrest.Matchers.equalTo;
+
+public class DataFrameAnalyticsLazyStartIT extends BaseMlIntegTestCase {
+
+    @Override
+    protected Collection<Class<? extends Plugin>> nodePlugins() {
+        return CollectionUtils.appendToCopy(super.nodePlugins(), MockTransportService.TestPlugin.class);
+    }
+
+    @Before
+    public void setupCluster() throws Exception {
+        internalCluster().ensureAtMostNumDataNodes(0);
+        logger.info("Starting dedicated master node...");
+        internalCluster().startMasterOnlyNode();
+        logger.info("Starting data node...");
+        internalCluster().startNode(onlyRoles(Set.of(DiscoveryNodeRole.DATA_ROLE)));
+        ensureStableCluster();
+    }
+
+    private void ensureStableCluster() {
+        ensureStableCluster(internalCluster().getNodeNames().length, TimeValue.timeValueSeconds(60));
+    }
+
+    public void testNoMlNodesLazyStart() throws Exception {
+        String indexName = "data";
+        createIndex(indexName);
+
+        DataFrameAnalyticsConfig.Builder dataFrameAnalyticsConfig = new DataFrameAnalyticsConfig
+            .Builder()
+            .setSource(new DataFrameAnalyticsSource(new String[]{indexName}, null, null))
+            .setAnalysis(new OutlierDetection.Builder().build())
+            .setDest(new DataFrameAnalyticsDest("foo", null));
+        {
+            String analyticsId = "not-lazy-dfa";
+            client().execute(
+                PutDataFrameAnalyticsAction.INSTANCE,
+                new PutDataFrameAnalyticsAction.Request(dataFrameAnalyticsConfig.setId(analyticsId).build()))
+                .actionGet();
+            Exception ex = expectThrows(Exception.class,
+                () -> client().execute(
+                    StartDataFrameAnalyticsAction.INSTANCE,
+                    new StartDataFrameAnalyticsAction.Request(analyticsId)
+                ).actionGet());
+            assertThat(ex.getMessage(), containsString("No ML node to run on"));
+        }
+        {
+            String analyticsId = "lazy-dfa";
+            client().execute(
+                PutDataFrameAnalyticsAction.INSTANCE,
+                new PutDataFrameAnalyticsAction.Request(dataFrameAnalyticsConfig.setId(analyticsId).setAllowLazyStart(true).build()))
+                .actionGet();
+            client().execute(StartDataFrameAnalyticsAction.INSTANCE, new StartDataFrameAnalyticsAction.Request(analyticsId)).actionGet();
+            // it is starting lazily
+            assertBusy(() -> {
+                assertThat(client().execute(GetDataFrameAnalyticsStatsAction.INSTANCE,
+                    new GetDataFrameAnalyticsStatsAction.Request(analyticsId))
+                    .actionGet()
+                    .getResponse()
+                    .results()
+                    .get(0)
+                    .getState(), equalTo(DataFrameAnalyticsState.STARTING));
+            });
+            client().execute(StopDataFrameAnalyticsAction.INSTANCE, new StopDataFrameAnalyticsAction.Request(analyticsId)).actionGet();
+            assertBusy(() -> {
+                assertThat(client().execute(GetDataFrameAnalyticsStatsAction.INSTANCE,
+                    new GetDataFrameAnalyticsStatsAction.Request(analyticsId))
+                    .actionGet()
+                    .getResponse()
+                    .results()
+                    .get(0)
+                    .getState(), equalTo(DataFrameAnalyticsState.STOPPED));
+            });
+        }
+    }
+
+    public void testNoMlNodesButWithLazyNodes() throws Exception {
+        String indexName = "data";
+        createIndex(indexName);
+
+        client()
+            .admin()
+            .cluster()
+            .prepareUpdateSettings()
+            .setTransientSettings(Settings.builder().put(MachineLearning.MAX_LAZY_ML_NODES.getKey(), 10_000))
+            .get();
+
+        String analyticsId = "not-lazy-dfa-with-lazy-nodes";
+        DataFrameAnalyticsConfig.Builder dataFrameAnalyticsConfig = new DataFrameAnalyticsConfig
+            .Builder()
+            .setId(analyticsId)
+            .setSource(new DataFrameAnalyticsSource(new String[]{indexName}, null, null))
+            .setAnalysis(new OutlierDetection.Builder().build())
+            .setDest(new DataFrameAnalyticsDest("foo", null));
+        client().execute(
+            PutDataFrameAnalyticsAction.INSTANCE,
+            new PutDataFrameAnalyticsAction.Request(dataFrameAnalyticsConfig.setId(analyticsId).build()))
+            .actionGet();
+        client().execute(StartDataFrameAnalyticsAction.INSTANCE, new StartDataFrameAnalyticsAction.Request(analyticsId)).actionGet();
+        // it is starting lazily
+        assertBusy(() -> {
+            assertThat(client().execute(GetDataFrameAnalyticsStatsAction.INSTANCE,
+                new GetDataFrameAnalyticsStatsAction.Request(analyticsId))
+                .actionGet()
+                .getResponse()
+                .results()
+                .get(0)
+                .getState(), equalTo(DataFrameAnalyticsState.STARTING));
+        });
+        client().execute(StopDataFrameAnalyticsAction.INSTANCE, new StopDataFrameAnalyticsAction.Request(analyticsId)).actionGet();
+        assertBusy(() -> {
+            assertThat(client().execute(GetDataFrameAnalyticsStatsAction.INSTANCE,
+                new GetDataFrameAnalyticsStatsAction.Request(analyticsId))
+                .actionGet()
+                .getResponse()
+                .results()
+                .get(0)
+                .getState(), equalTo(DataFrameAnalyticsState.STOPPED));
+        });
+
+        client()
+            .admin()
+            .cluster()
+            .prepareUpdateSettings()
+            .setTransientSettings(Settings.builder().putNull(MachineLearning.MAX_LAZY_ML_NODES.getKey()))
+            .get();
+    }
+
+    public void testExplainWithLazyStartSet() {
+        String indexName = "data";
+        createIndex(indexName);
+
+        String analyticsId = "not-lazy-dfa-with-lazy-nodes";
+        DataFrameAnalyticsConfig.Builder dataFrameAnalyticsConfig = new DataFrameAnalyticsConfig
+            .Builder()
+            .setId(analyticsId)
+            .setSource(new DataFrameAnalyticsSource(new String[]{indexName}, null, null))
+            .setAnalysis(new OutlierDetection.Builder().build())
+            .setDest(new DataFrameAnalyticsDest("foo", null));
+
+        Exception ex = expectThrows(Exception.class, () -> client().execute(
+            ExplainDataFrameAnalyticsAction.INSTANCE,
+            new PutDataFrameAnalyticsAction.Request(dataFrameAnalyticsConfig.setId(analyticsId).buildForExplain()))
+            .actionGet());
+        assertThat(ex.getMessage(), containsString("No ML node to run on"));
+
+
+        ExplainDataFrameAnalyticsAction.Response response = client().execute(
+            ExplainDataFrameAnalyticsAction.INSTANCE,
+            new PutDataFrameAnalyticsAction.Request(dataFrameAnalyticsConfig.setId(analyticsId).setAllowLazyStart(true).buildForExplain()))
+            .actionGet();
+
+        assertThat(response.getMemoryEstimation().getExpectedMemoryWithoutDisk(), equalTo(DEFAULT_MODEL_MEMORY_LIMIT));
+        assertThat(response.getMemoryEstimation().getExpectedMemoryWithDisk(), equalTo(DEFAULT_MODEL_MEMORY_LIMIT));
+    }
+
+    public void testExplainWithLazyMlNodes() {
+        String indexName = "data";
+        createIndex(indexName);
+
+        client()
+            .admin()
+            .cluster()
+            .prepareUpdateSettings()
+            .setTransientSettings(Settings.builder().put(MachineLearning.MAX_LAZY_ML_NODES.getKey(), 10_000))
+            .get();
+
+        String analyticsId = "not-lazy-dfa-with-lazy-nodes";
+        DataFrameAnalyticsConfig.Builder dataFrameAnalyticsConfig = new DataFrameAnalyticsConfig
+            .Builder()
+            .setId(analyticsId)
+            .setSource(new DataFrameAnalyticsSource(new String[]{indexName}, null, null))
+            .setAnalysis(new OutlierDetection.Builder().build())
+            .setDest(new DataFrameAnalyticsDest("foo", null));
+
+        ExplainDataFrameAnalyticsAction.Response response = client().execute(
+            ExplainDataFrameAnalyticsAction.INSTANCE,
+            new PutDataFrameAnalyticsAction.Request(dataFrameAnalyticsConfig.setId(analyticsId).buildForExplain()))
+            .actionGet();
+
+        assertThat(response.getMemoryEstimation().getExpectedMemoryWithoutDisk(), equalTo(DEFAULT_MODEL_MEMORY_LIMIT));
+        assertThat(response.getMemoryEstimation().getExpectedMemoryWithDisk(), equalTo(DEFAULT_MODEL_MEMORY_LIMIT));
+    }
+
+    private void createIndex(String indexName) {
+        client().admin().indices().prepareCreate(indexName).get();
+        client().prepareIndex(indexName)
+            .setRefreshPolicy(WriteRequest.RefreshPolicy.IMMEDIATE)
+            .setSource("{\"field\": 1, \"other\": 2}", XContentType.JSON)
+            .get();
+    }
+}

+ 47 - 12
x-pack/plugin/ml/src/main/java/org/elasticsearch/xpack/ml/action/TransportExplainDataFrameAnalyticsAction.java

@@ -5,6 +5,8 @@
  */
 package org.elasticsearch.xpack.ml.action;
 
+import org.apache.logging.log4j.LogManager;
+import org.apache.logging.log4j.Logger;
 import org.elasticsearch.action.ActionListener;
 import org.elasticsearch.action.ActionListenerResponseHandler;
 import org.elasticsearch.action.support.ActionFilters;
@@ -16,6 +18,7 @@ import org.elasticsearch.cluster.node.DiscoveryNode;
 import org.elasticsearch.cluster.service.ClusterService;
 import org.elasticsearch.common.collect.Tuple;
 import org.elasticsearch.common.inject.Inject;
+import org.elasticsearch.common.logging.HeaderWarning;
 import org.elasticsearch.common.settings.Settings;
 import org.elasticsearch.common.unit.ByteSizeValue;
 import org.elasticsearch.license.LicenseUtils;
@@ -30,6 +33,7 @@ import org.elasticsearch.xpack.core.ml.action.PutDataFrameAnalyticsAction;
 import org.elasticsearch.xpack.core.ml.dataframe.DataFrameAnalyticsConfig;
 import org.elasticsearch.xpack.core.ml.dataframe.explain.FieldSelection;
 import org.elasticsearch.xpack.core.ml.dataframe.explain.MemoryEstimation;
+import org.elasticsearch.xpack.core.ml.job.messages.Messages;
 import org.elasticsearch.xpack.core.ml.utils.ExceptionsHelper;
 import org.elasticsearch.xpack.core.security.SecurityContext;
 import org.elasticsearch.xpack.ml.MachineLearning;
@@ -44,6 +48,7 @@ import java.util.Objects;
 import java.util.Optional;
 
 import static org.elasticsearch.xpack.core.ClientHelper.filterSecurityHeaders;
+import static org.elasticsearch.xpack.core.ml.dataframe.DataFrameAnalyticsConfig.DEFAULT_MODEL_MEMORY_LIMIT;
 import static org.elasticsearch.xpack.ml.utils.SecondaryAuthorizationUtils.useSecondaryAuthIfAvailable;
 
 /**
@@ -53,6 +58,7 @@ import static org.elasticsearch.xpack.ml.utils.SecondaryAuthorizationUtils.useSe
 public class TransportExplainDataFrameAnalyticsAction
     extends HandledTransportAction<PutDataFrameAnalyticsAction.Request, ExplainDataFrameAnalyticsAction.Response> {
 
+    private static final Logger logger = LogManager.getLogger(TransportExplainDataFrameAnalyticsAction.class);
     private final XPackLicenseState licenseState;
     private final TransportService transportService;
     private final ClusterService clusterService;
@@ -60,6 +66,7 @@ public class TransportExplainDataFrameAnalyticsAction
     private final MemoryUsageEstimationProcessManager processManager;
     private final SecurityContext securityContext;
     private final ThreadPool threadPool;
+    private volatile int numLazyMLNodes;
 
     @Inject
     public TransportExplainDataFrameAnalyticsAction(TransportService transportService,
@@ -77,9 +84,15 @@ public class TransportExplainDataFrameAnalyticsAction
         this.licenseState = licenseState;
         this.processManager = Objects.requireNonNull(processManager);
         this.threadPool = threadPool;
+        this.numLazyMLNodes = MachineLearning.MAX_LAZY_ML_NODES.get(settings);
         this.securityContext = XPackSettings.SECURITY_ENABLED.get(settings) ?
             new SecurityContext(settings, threadPool.getThreadContext()) :
             null;
+        clusterService.getClusterSettings().addSettingsUpdateConsumer(MachineLearning.MAX_LAZY_ML_NODES, this::setNumLazyMLNodes);
+    }
+
+    private void setNumLazyMLNodes(int value) {
+        this.numLazyMLNodes = value;
     }
 
     @Override
@@ -93,13 +106,15 @@ public class TransportExplainDataFrameAnalyticsAction
 
         DiscoveryNode localNode = clusterService.localNode();
         if (MachineLearning.isMlNode(localNode)) {
-            explain(task, request, listener);
+            explain(task, request, true, listener);
         } else {
-            redirectToMlNode(request, listener);
+            redirectToMlNode(task, request, listener);
         }
     }
 
-    private void explain(Task task, PutDataFrameAnalyticsAction.Request request,
+    private void explain(Task task,
+                         PutDataFrameAnalyticsAction.Request request,
+                         boolean shouldEstimateMemory,
                          ActionListener<ExplainDataFrameAnalyticsAction.Response> listener) {
 
         final ExtractedFieldsDetectorFactory extractedFieldsDetectorFactory = new ExtractedFieldsDetectorFactory(
@@ -115,7 +130,7 @@ public class TransportExplainDataFrameAnalyticsAction
                 extractedFieldsDetectorFactory.createFromSource(
                     config,
                     ActionListener.wrap(
-                        extractedFieldsDetector -> explain(task, config, extractedFieldsDetector, listener),
+                        extractedFieldsDetector -> explain(task, config, extractedFieldsDetector, shouldEstimateMemory, listener),
                         listener::onFailure
                     )
                 );
@@ -124,7 +139,7 @@ public class TransportExplainDataFrameAnalyticsAction
             extractedFieldsDetectorFactory.createFromSource(
                 request.getConfig(),
                 ActionListener.wrap(
-                    extractedFieldsDetector -> explain(task, request.getConfig(), extractedFieldsDetector, listener),
+                    extractedFieldsDetector -> explain(task, request.getConfig(), extractedFieldsDetector, shouldEstimateMemory, listener),
                     listener::onFailure
                 )
             );
@@ -132,9 +147,31 @@ public class TransportExplainDataFrameAnalyticsAction
 
     }
 
-    private void explain(Task task, DataFrameAnalyticsConfig config, ExtractedFieldsDetector extractedFieldsDetector,
+    private void explain(Task task,
+                         DataFrameAnalyticsConfig config,
+                         ExtractedFieldsDetector extractedFieldsDetector,
+                         boolean shouldEstimateMemory,
                          ActionListener<ExplainDataFrameAnalyticsAction.Response> listener) {
         Tuple<ExtractedFields, List<FieldSelection>> fieldExtraction = extractedFieldsDetector.detect();
+        if (fieldExtraction.v1().getAllFields().isEmpty()) {
+            listener.onResponse(new ExplainDataFrameAnalyticsAction.Response(
+                fieldExtraction.v2(),
+                new MemoryEstimation(ByteSizeValue.ZERO, ByteSizeValue.ZERO)
+            ));
+            return;
+        }
+        if (shouldEstimateMemory == false) {
+            String warning =  Messages.getMessage(
+                Messages.DATA_FRAME_ANALYTICS_AUDIT_UNABLE_TO_ESTIMATE_MEMORY_USAGE,
+                config.getModelMemoryLimit());
+            logger.warn("[{}] {}", config.getId(), warning);
+            HeaderWarning.addWarning(warning);
+            listener.onResponse(new ExplainDataFrameAnalyticsAction.Response(
+                fieldExtraction.v2(),
+                new MemoryEstimation(DEFAULT_MODEL_MEMORY_LIMIT, DEFAULT_MODEL_MEMORY_LIMIT)
+            ));
+            return;
+        }
 
         ActionListener<MemoryEstimation> memoryEstimationListener = ActionListener.wrap(
             memoryEstimation -> listener.onResponse(new ExplainDataFrameAnalyticsAction.Response(fieldExtraction.v2(), memoryEstimation)),
@@ -153,11 +190,6 @@ public class TransportExplainDataFrameAnalyticsAction
                                      DataFrameAnalyticsConfig config,
                                      ExtractedFields extractedFields,
                                      ActionListener<MemoryEstimation> listener) {
-        if (extractedFields.getAllFields().isEmpty()) {
-            listener.onResponse(new MemoryEstimation(ByteSizeValue.ZERO, ByteSizeValue.ZERO));
-            return;
-        }
-
         final String estimateMemoryTaskId = "memory_usage_estimation_" + task.getId();
         DataFrameDataExtractorFactory extractorFactory = DataFrameDataExtractorFactory.createForSourceIndices(
             new ParentTaskAssigningClient(client, task.getParentTaskId()), estimateMemoryTaskId, config, extractedFields);
@@ -176,12 +208,15 @@ public class TransportExplainDataFrameAnalyticsAction
     /**
      * Finds the first available ML node in the cluster and redirects the request to this node.
      */
-    private void redirectToMlNode(PutDataFrameAnalyticsAction.Request request,
+    private void redirectToMlNode(Task task,
+                                  PutDataFrameAnalyticsAction.Request request,
                                   ActionListener<ExplainDataFrameAnalyticsAction.Response> listener) {
         Optional<DiscoveryNode> node = findMlNode(clusterService.state());
         if (node.isPresent()) {
             transportService.sendRequest(node.get(), actionName, request,
                 new ActionListenerResponseHandler<>(listener, ExplainDataFrameAnalyticsAction.Response::new));
+        } else if (numLazyMLNodes > 0 || request.getConfig().isAllowLazyStart()) {
+            explain(task, request, false, listener);
         } else {
             listener.onFailure(ExceptionsHelper.badRequestException("No ML node to run on"));
         }