Browse Source

[ML] Introduce a setting for the process connect timeout (#43234)

This change introduces a new setting,
xpack.ml.process_connect_timeout, to enable
the timeout for one of the external ML processes
to connect to the ES JVM to be increased.

The timeout may need to be increased if many
processes are being started simultaneously on
the same machine. This is unlikely in clusters
with many ML nodes, as we balance the processes
across the ML nodes, but can happen in clusters
with a single ML node and a high value for
xpack.ml.node_concurrent_job_allocations.
David Roberts 6 years ago
parent
commit
76ad7d8464

+ 8 - 0
docs/reference/settings/ml-settings.asciidoc

@@ -109,3 +109,11 @@ cluster and the job is assigned to run on that node.
 IMPORTANT: This setting assumes some external process is capable of adding ML nodes
 to the cluster. This setting is only useful when used in conjunction with
 such an external process.
+
+`xpack.ml.process_connect_timeout` (<<cluster-update-settings,Dynamic>>)::
+The connection timeout for {ml} processes that run separately from the {es} JVM.
+Defaults to `10s`. Some {ml} processing is done by processes that run separately
+to the {es} JVM. When such processes are started they must connect to the {es}
+JVM. If such a process does not connect within the time period specified by this
+setting then the process is assumed to have failed. Defaults to `10s`. The minimum
+value for this setting is `5s`.

+ 7 - 2
x-pack/plugin/ml/src/main/java/org/elasticsearch/xpack/ml/MachineLearning.java

@@ -328,6 +328,10 @@ public class MachineLearning extends Plugin implements ActionPlugin, AnalysisPlu
     public static final Setting<Integer> MAX_OPEN_JOBS_PER_NODE =
             Setting.intSetting("xpack.ml.max_open_jobs", 20, 1, MAX_MAX_OPEN_JOBS_PER_NODE, Property.Dynamic, Property.NodeScope);
 
+    public static final Setting<TimeValue> PROCESS_CONNECT_TIMEOUT =
+        Setting.timeSetting("xpack.ml.process_connect_timeout", TimeValue.timeValueSeconds(10),
+            TimeValue.timeValueSeconds(5), Setting.Property.Dynamic, Setting.Property.NodeScope);
+
     // Undocumented setting for integration test purposes
     public static final Setting<ByteSizeValue> MIN_DISK_SPACE_OFF_HEAP =
         Setting.byteSizeSetting("xpack.ml.min_disk_space_off_heap", new ByteSizeValue(5, ByteSizeUnit.GB), Setting.Property.NodeScope);
@@ -363,6 +367,7 @@ public class MachineLearning extends Plugin implements ActionPlugin, AnalysisPlu
     public List<Setting<?>> getSettings() {
         return List.of(
                 MachineLearningField.AUTODETECT_PROCESS,
+                PROCESS_CONNECT_TIMEOUT,
                 ML_ENABLED,
                 CONCURRENT_JOB_ALLOCATIONS,
                 MachineLearningField.MAX_MODEL_MEMORY_LIMIT,
@@ -477,8 +482,8 @@ public class MachineLearning extends Plugin implements ActionPlugin, AnalysisPlu
                     nativeController,
                     client,
                     clusterService);
-                normalizerProcessFactory = new NativeNormalizerProcessFactory(environment, nativeController);
-                analyticsProcessFactory = new NativeAnalyticsProcessFactory(environment, nativeController);
+                normalizerProcessFactory = new NativeNormalizerProcessFactory(environment, nativeController, clusterService);
+                analyticsProcessFactory = new NativeAnalyticsProcessFactory(environment, nativeController, clusterService);
                 mlController = nativeController;
             } catch (IOException e) {
                 // The low level cause of failure from the named pipe helper's perspective is almost never the real root cause, so

+ 13 - 3
x-pack/plugin/ml/src/main/java/org/elasticsearch/xpack/ml/dataframe/process/NativeAnalyticsProcessFactory.java

@@ -7,10 +7,13 @@ package org.elasticsearch.xpack.ml.dataframe.process;
 
 import org.apache.logging.log4j.LogManager;
 import org.apache.logging.log4j.Logger;
+import org.elasticsearch.cluster.service.ClusterService;
+import org.elasticsearch.common.unit.TimeValue;
 import org.elasticsearch.common.util.concurrent.EsRejectedExecutionException;
 import org.elasticsearch.core.internal.io.IOUtils;
 import org.elasticsearch.env.Environment;
 import org.elasticsearch.xpack.core.ml.utils.ExceptionsHelper;
+import org.elasticsearch.xpack.ml.MachineLearning;
 import org.elasticsearch.xpack.ml.process.NativeController;
 import org.elasticsearch.xpack.ml.process.ProcessPipes;
 import org.elasticsearch.xpack.ml.utils.NamedPipeHelper;
@@ -28,14 +31,21 @@ public class NativeAnalyticsProcessFactory implements AnalyticsProcessFactory {
     private static final Logger LOGGER = LogManager.getLogger(NativeAnalyticsProcessFactory.class);
 
     private static final NamedPipeHelper NAMED_PIPE_HELPER = new NamedPipeHelper();
-    public static final Duration PROCESS_STARTUP_TIMEOUT = Duration.ofSeconds(10);
 
     private final Environment env;
     private final NativeController nativeController;
+    private volatile Duration processConnectTimeout;
 
-    public NativeAnalyticsProcessFactory(Environment env, NativeController nativeController) {
+    public NativeAnalyticsProcessFactory(Environment env, NativeController nativeController, ClusterService clusterService) {
         this.env = Objects.requireNonNull(env);
         this.nativeController = Objects.requireNonNull(nativeController);
+        setProcessConnectTimeout(MachineLearning.PROCESS_CONNECT_TIMEOUT.get(env.settings()));
+        clusterService.getClusterSettings().addSettingsUpdateConsumer(MachineLearning.PROCESS_CONNECT_TIMEOUT,
+            this::setProcessConnectTimeout);
+    }
+
+    void setProcessConnectTimeout(TimeValue processConnectTimeout) {
+        this.processConnectTimeout = Duration.ofMillis(processConnectTimeout.getMillis());
     }
 
     @Override
@@ -74,7 +84,7 @@ public class NativeAnalyticsProcessFactory implements AnalyticsProcessFactory {
                 filesToDelete);
         try {
             analyticsBuilder.build();
-            processPipes.connectStreams(PROCESS_STARTUP_TIMEOUT);
+            processPipes.connectStreams(processConnectTimeout);
         } catch (IOException e) {
             String msg = "Failed to launch data frame analytics process for job " + jobId;
             LOGGER.error(msg);

+ 12 - 4
x-pack/plugin/ml/src/main/java/org/elasticsearch/xpack/ml/job/process/autodetect/NativeAutodetectProcessFactory.java

@@ -10,6 +10,7 @@ import org.apache.logging.log4j.Logger;
 import org.elasticsearch.client.Client;
 import org.elasticsearch.cluster.service.ClusterService;
 import org.elasticsearch.common.settings.Settings;
+import org.elasticsearch.common.unit.TimeValue;
 import org.elasticsearch.common.util.concurrent.EsRejectedExecutionException;
 import org.elasticsearch.core.internal.io.IOUtils;
 import org.elasticsearch.env.Environment;
@@ -37,13 +38,13 @@ public class NativeAutodetectProcessFactory implements AutodetectProcessFactory
 
     private static final Logger LOGGER = LogManager.getLogger(NativeAutodetectProcessFactory.class);
     private static final NamedPipeHelper NAMED_PIPE_HELPER = new NamedPipeHelper();
-    public static final Duration PROCESS_STARTUP_TIMEOUT = Duration.ofSeconds(10);
 
     private final Client client;
     private final Environment env;
     private final Settings settings;
     private final NativeController nativeController;
     private final ClusterService clusterService;
+    private volatile Duration processConnectTimeout;
 
     public NativeAutodetectProcessFactory(Environment env, Settings settings, NativeController nativeController, Client client,
                                           ClusterService clusterService) {
@@ -52,6 +53,13 @@ public class NativeAutodetectProcessFactory implements AutodetectProcessFactory
         this.nativeController = Objects.requireNonNull(nativeController);
         this.client = client;
         this.clusterService = clusterService;
+        setProcessConnectTimeout(MachineLearning.PROCESS_CONNECT_TIMEOUT.get(settings));
+        clusterService.getClusterSettings().addSettingsUpdateConsumer(MachineLearning.PROCESS_CONNECT_TIMEOUT,
+            this::setProcessConnectTimeout);
+    }
+
+    void setProcessConnectTimeout(TimeValue processConnectTimeout) {
+        this.processConnectTimeout = Duration.ofMillis(processConnectTimeout.getMillis());
     }
 
     @Override
@@ -88,8 +96,8 @@ public class NativeAutodetectProcessFactory implements AutodetectProcessFactory
         }
     }
 
-    private void createNativeProcess(Job job, AutodetectParams autodetectParams, ProcessPipes processPipes,
-                                     List<Path> filesToDelete) {
+    void createNativeProcess(Job job, AutodetectParams autodetectParams, ProcessPipes processPipes,
+                             List<Path> filesToDelete) {
         try {
 
             Settings updatedSettings = Settings.builder()
@@ -109,7 +117,7 @@ public class NativeAutodetectProcessFactory implements AutodetectProcessFactory
                 autodetectBuilder.quantiles(autodetectParams.quantiles());
             }
             autodetectBuilder.build();
-            processPipes.connectStreams(PROCESS_STARTUP_TIMEOUT);
+            processPipes.connectStreams(processConnectTimeout);
         } catch (IOException e) {
             String msg = "Failed to launch autodetect for job " + job.getId();
             LOGGER.error(msg);

+ 13 - 3
x-pack/plugin/ml/src/main/java/org/elasticsearch/xpack/ml/job/process/normalizer/NativeNormalizerProcessFactory.java

@@ -7,10 +7,13 @@ package org.elasticsearch.xpack.ml.job.process.normalizer;
 
 import org.apache.logging.log4j.LogManager;
 import org.apache.logging.log4j.Logger;
+import org.elasticsearch.cluster.service.ClusterService;
+import org.elasticsearch.common.unit.TimeValue;
 import org.elasticsearch.common.util.concurrent.EsRejectedExecutionException;
 import org.elasticsearch.core.internal.io.IOUtils;
 import org.elasticsearch.env.Environment;
 import org.elasticsearch.xpack.core.ml.utils.ExceptionsHelper;
+import org.elasticsearch.xpack.ml.MachineLearning;
 import org.elasticsearch.xpack.ml.process.NativeController;
 import org.elasticsearch.xpack.ml.process.ProcessPipes;
 import org.elasticsearch.xpack.ml.utils.NamedPipeHelper;
@@ -25,14 +28,21 @@ public class NativeNormalizerProcessFactory implements NormalizerProcessFactory
 
     private static final Logger LOGGER = LogManager.getLogger(NativeNormalizerProcessFactory.class);
     private static final NamedPipeHelper NAMED_PIPE_HELPER = new NamedPipeHelper();
-    private static final Duration PROCESS_STARTUP_TIMEOUT = Duration.ofSeconds(10);
 
     private final Environment env;
     private final NativeController nativeController;
+    private volatile Duration processConnectTimeout;
 
-    public NativeNormalizerProcessFactory(Environment env, NativeController nativeController) {
+    public NativeNormalizerProcessFactory(Environment env, NativeController nativeController, ClusterService clusterService) {
         this.env = Objects.requireNonNull(env);
         this.nativeController = Objects.requireNonNull(nativeController);
+        setProcessConnectTimeout(MachineLearning.PROCESS_CONNECT_TIMEOUT.get(env.settings()));
+        clusterService.getClusterSettings().addSettingsUpdateConsumer(MachineLearning.PROCESS_CONNECT_TIMEOUT,
+            this::setProcessConnectTimeout);
+    }
+
+    void setProcessConnectTimeout(TimeValue processConnectTimeout) {
+        this.processConnectTimeout = Duration.ofMillis(processConnectTimeout.getMillis());
     }
 
     @Override
@@ -64,7 +74,7 @@ public class NativeNormalizerProcessFactory implements NormalizerProcessFactory
             List<String> command = new NormalizerBuilder(env, jobId, quantilesState, bucketSpan).build();
             processPipes.addArgs(command);
             nativeController.startProcess(command);
-            processPipes.connectStreams(PROCESS_STARTUP_TIMEOUT);
+            processPipes.connectStreams(processConnectTimeout);
         } catch (IOException e) {
             String msg = "Failed to launch normalizer for job " + jobId;
             LOGGER.error(msg);

+ 61 - 0
x-pack/plugin/ml/src/test/java/org/elasticsearch/xpack/ml/job/process/autodetect/NativeAutodetectProcessFactoryTests.java

@@ -0,0 +1,61 @@
+/*
+ * Copyright Elasticsearch B.V. and/or licensed to Elasticsearch B.V. under one
+ * or more contributor license agreements. Licensed under the Elastic License;
+ * you may not use this file except in compliance with the Elastic License.
+ */
+package org.elasticsearch.xpack.ml.job.process.autodetect;
+
+import org.elasticsearch.client.Client;
+import org.elasticsearch.cluster.service.ClusterService;
+import org.elasticsearch.common.settings.ClusterSettings;
+import org.elasticsearch.common.settings.Settings;
+import org.elasticsearch.common.unit.TimeValue;
+import org.elasticsearch.env.Environment;
+import org.elasticsearch.env.TestEnvironment;
+import org.elasticsearch.test.ESTestCase;
+import org.elasticsearch.xpack.core.ml.job.config.Job;
+import org.elasticsearch.xpack.ml.MachineLearning;
+import org.elasticsearch.xpack.ml.job.process.autodetect.params.AutodetectParams;
+import org.elasticsearch.xpack.ml.process.NativeController;
+import org.elasticsearch.xpack.ml.process.ProcessPipes;
+
+import java.io.IOException;
+import java.time.Duration;
+import java.util.Collections;
+import java.util.Set;
+
+import static org.mockito.Matchers.eq;
+import static org.mockito.Mockito.mock;
+import static org.mockito.Mockito.times;
+import static org.mockito.Mockito.verify;
+import static org.mockito.Mockito.when;
+
+public class NativeAutodetectProcessFactoryTests extends ESTestCase {
+
+    public void testSetProcessConnectTimeout() throws IOException {
+
+        int timeoutSeconds = randomIntBetween(5, 100);
+
+        Settings settings = Settings.builder()
+            .put(Environment.PATH_HOME_SETTING.getKey(), createTempDir().toString())
+            .build();
+        Environment env = TestEnvironment.newEnvironment(settings);
+        NativeController nativeController = mock(NativeController.class);
+        Client client = mock(Client.class);
+        ClusterSettings clusterSettings = new ClusterSettings(settings,
+            Set.of(MachineLearning.PROCESS_CONNECT_TIMEOUT, AutodetectBuilder.MAX_ANOMALY_RECORDS_SETTING_DYNAMIC));
+        ClusterService clusterService = mock(ClusterService.class);
+        when(clusterService.getClusterSettings()).thenReturn(clusterSettings);
+        Job job = mock(Job.class);
+        when(job.getId()).thenReturn("set_process_connect_test_job");
+        AutodetectParams autodetectParams = mock(AutodetectParams.class);
+        ProcessPipes processPipes = mock(ProcessPipes.class);
+
+        NativeAutodetectProcessFactory nativeAutodetectProcessFactory =
+            new NativeAutodetectProcessFactory(env, settings, nativeController, client, clusterService);
+        nativeAutodetectProcessFactory.setProcessConnectTimeout(TimeValue.timeValueSeconds(timeoutSeconds));
+        nativeAutodetectProcessFactory.createNativeProcess(job, autodetectParams, processPipes, Collections.emptyList());
+
+        verify(processPipes, times(1)).connectStreams(eq(Duration.ofSeconds(timeoutSeconds)));
+    }
+}