Răsfoiți Sursa

[ML] Construct ML native controller in standard place (#43570)

The ML native controller used to be used within the
ML feature set which necessitated constructing it
outside of the usual createComponents() method.
Now that the ML native code info is reported by the
ML info action rather than by the feature set it is
possible to move construction to the standard place,
which avoids surprises for maintainers.
David Roberts 6 ani în urmă
părinte
comite
84a7b1cc6b
18 a modificat fișierele cu 161 adăugiri și 186 ștergeri
  1. 10 8
      x-pack/plugin/ml/src/main/java/org/elasticsearch/xpack/ml/MachineLearning.java
  2. 0 29
      x-pack/plugin/ml/src/main/java/org/elasticsearch/xpack/ml/MachineLearningInfoTransportAction.java
  3. 17 31
      x-pack/plugin/ml/src/main/java/org/elasticsearch/xpack/ml/MlLifeCycleService.java
  4. 4 18
      x-pack/plugin/ml/src/main/java/org/elasticsearch/xpack/ml/action/TransportMlInfoAction.java
  5. 6 4
      x-pack/plugin/ml/src/main/java/org/elasticsearch/xpack/ml/dataframe/process/NativeAnalyticsProcess.java
  6. 1 1
      x-pack/plugin/ml/src/main/java/org/elasticsearch/xpack/ml/dataframe/process/NativeAnalyticsProcessFactory.java
  7. 5 3
      x-pack/plugin/ml/src/main/java/org/elasticsearch/xpack/ml/job/process/autodetect/NativeAutodetectProcess.java
  8. 1 1
      x-pack/plugin/ml/src/main/java/org/elasticsearch/xpack/ml/job/process/autodetect/NativeAutodetectProcessFactory.java
  9. 4 2
      x-pack/plugin/ml/src/main/java/org/elasticsearch/xpack/ml/job/process/normalizer/NativeNormalizerProcess.java
  10. 1 1
      x-pack/plugin/ml/src/main/java/org/elasticsearch/xpack/ml/job/process/normalizer/NativeNormalizerProcessFactory.java
  11. 6 4
      x-pack/plugin/ml/src/main/java/org/elasticsearch/xpack/ml/process/AbstractNativeProcess.java
  12. 26 0
      x-pack/plugin/ml/src/main/java/org/elasticsearch/xpack/ml/process/DummyController.java
  13. 36 0
      x-pack/plugin/ml/src/main/java/org/elasticsearch/xpack/ml/process/MlController.java
  14. 28 0
      x-pack/plugin/ml/src/main/java/org/elasticsearch/xpack/ml/process/MlControllerHolder.java
  15. 10 3
      x-pack/plugin/ml/src/main/java/org/elasticsearch/xpack/ml/process/NativeController.java
  16. 0 60
      x-pack/plugin/ml/src/main/java/org/elasticsearch/xpack/ml/process/NativeControllerHolder.java
  17. 0 16
      x-pack/plugin/ml/src/test/java/org/elasticsearch/xpack/ml/MachineLearningInfoTransportActionTests.java
  18. 6 5
      x-pack/plugin/ml/src/test/java/org/elasticsearch/xpack/ml/job/process/autodetect/NativeAutodetectProcessTests.java

+ 10 - 8
x-pack/plugin/ml/src/main/java/org/elasticsearch/xpack/ml/MachineLearning.java

@@ -210,9 +210,11 @@ import org.elasticsearch.xpack.ml.job.process.normalizer.NativeNormalizerProcess
 import org.elasticsearch.xpack.ml.job.process.normalizer.NormalizerFactory;
 import org.elasticsearch.xpack.ml.job.process.normalizer.NormalizerProcessFactory;
 import org.elasticsearch.xpack.ml.notifications.Auditor;
+import org.elasticsearch.xpack.ml.process.DummyController;
+import org.elasticsearch.xpack.ml.process.MlController;
+import org.elasticsearch.xpack.ml.process.MlControllerHolder;
 import org.elasticsearch.xpack.ml.process.MlMemoryTracker;
 import org.elasticsearch.xpack.ml.process.NativeController;
-import org.elasticsearch.xpack.ml.process.NativeControllerHolder;
 import org.elasticsearch.xpack.ml.process.NativeStorageProvider;
 import org.elasticsearch.xpack.ml.rest.RestDeleteExpiredDataAction;
 import org.elasticsearch.xpack.ml.rest.RestFindFileStructureAction;
@@ -462,16 +464,13 @@ public class MachineLearning extends Plugin implements ActionPlugin, AnalysisPlu
 
         NativeStorageProvider nativeStorageProvider = new NativeStorageProvider(environment, MIN_DISK_SPACE_OFF_HEAP.get(settings));
 
+        MlController mlController;
         AutodetectProcessFactory autodetectProcessFactory;
         NormalizerProcessFactory normalizerProcessFactory;
         AnalyticsProcessFactory analyticsProcessFactory;
-        if (MachineLearningField.AUTODETECT_PROCESS.get(settings) && MachineLearningInfoTransportAction.isRunningOnMlPlatform(true)) {
+        if (MachineLearningField.AUTODETECT_PROCESS.get(settings)) {
             try {
-                NativeController nativeController = NativeControllerHolder.getNativeController(clusterService.getNodeName(), environment);
-                if (nativeController == null) {
-                    // This will only only happen when path.home is not set, which is disallowed in production
-                    throw new ElasticsearchException("Failed to create native process controller for Machine Learning");
-                }
+                NativeController nativeController = NativeController.makeNativeController(clusterService.getNodeName(), environment);
                 autodetectProcessFactory = new NativeAutodetectProcessFactory(
                     environment,
                     settings,
@@ -480,6 +479,7 @@ public class MachineLearning extends Plugin implements ActionPlugin, AnalysisPlu
                     clusterService);
                 normalizerProcessFactory = new NativeNormalizerProcessFactory(environment, nativeController);
                 analyticsProcessFactory = new NativeAnalyticsProcessFactory(environment, nativeController);
+                mlController = nativeController;
             } catch (IOException e) {
                 // The low level cause of failure from the named pipe helper's perspective is almost never the real root cause, so
                 // only log this at the lowest level of detail.  It's almost always "file not found" on a named pipe we expect to be
@@ -491,6 +491,7 @@ public class MachineLearning extends Plugin implements ActionPlugin, AnalysisPlu
                     + XPackSettings.MACHINE_LEARNING_ENABLED.getKey() + ": false].");
             }
         } else {
+            mlController = new DummyController();
             autodetectProcessFactory = (job, autodetectParams, executorService, onProcessCrash) ->
                     new BlackHoleAutodetectProcess(job.getId());
             // factor of 1.0 makes renormalization a no-op
@@ -521,7 +522,7 @@ public class MachineLearning extends Plugin implements ActionPlugin, AnalysisPlu
         MlMemoryTracker memoryTracker = new MlMemoryTracker(settings, clusterService, threadPool, jobManager, jobResultsProvider,
             dataFrameAnalyticsConfigProvider);
         this.memoryTracker.set(memoryTracker);
-        MlLifeCycleService mlLifeCycleService = new MlLifeCycleService(environment, clusterService, datafeedManager,
+        MlLifeCycleService mlLifeCycleService = new MlLifeCycleService(clusterService, datafeedManager, mlController,
             autodetectProcessManager, memoryTracker);
 
         // this object registers as a license state listener, and is never removed, so there's no need to retain another reference to it
@@ -534,6 +535,7 @@ public class MachineLearning extends Plugin implements ActionPlugin, AnalysisPlu
 
         return Arrays.asList(
                 mlLifeCycleService,
+                new MlControllerHolder(mlController),
                 jobResultsProvider,
                 jobConfigProvider,
                 datafeedConfigProvider,

+ 0 - 29
x-pack/plugin/ml/src/main/java/org/elasticsearch/xpack/ml/MachineLearningInfoTransportAction.java

@@ -5,30 +5,18 @@
  */
 package org.elasticsearch.xpack.ml;
 
-import org.apache.lucene.util.Constants;
-import org.elasticsearch.ElasticsearchException;
 import org.elasticsearch.action.support.ActionFilters;
 import org.elasticsearch.common.inject.Inject;
 import org.elasticsearch.common.settings.Settings;
 import org.elasticsearch.license.XPackLicenseState;
-import org.elasticsearch.plugins.Platforms;
 import org.elasticsearch.transport.TransportService;
 import org.elasticsearch.xpack.core.XPackField;
 import org.elasticsearch.xpack.core.XPackSettings;
 import org.elasticsearch.xpack.core.action.XPackInfoFeatureAction;
 import org.elasticsearch.xpack.core.action.XPackInfoFeatureTransportAction;
 
-import java.util.Arrays;
-import java.util.List;
-
 public class MachineLearningInfoTransportAction extends XPackInfoFeatureTransportAction {
 
-    /**
-     * List of platforms for which the native processes are available
-     */
-    private static final List<String> mlPlatforms =
-        Arrays.asList("darwin-x86_64", "linux-x86_64", "windows-x86_64");
-
     private final boolean enabled;
     private final XPackLicenseState licenseState;
 
@@ -40,23 +28,6 @@ public class MachineLearningInfoTransportAction extends XPackInfoFeatureTranspor
         this.licenseState = licenseState;
     }
 
-    // TODO: remove these methods
-    static boolean isRunningOnMlPlatform(boolean fatalIfNot) {
-        return isRunningOnMlPlatform(Constants.OS_NAME, Constants.OS_ARCH, fatalIfNot);
-    }
-
-    static boolean isRunningOnMlPlatform(String osName, String osArch, boolean fatalIfNot) {
-        String platformName = Platforms.platformName(osName, osArch);
-        if (mlPlatforms.contains(platformName)) {
-            return true;
-        }
-        if (fatalIfNot) {
-            throw new ElasticsearchException("X-Pack is not supported and Machine Learning is not available for [" + platformName
-                + "]; you can use the other X-Pack features (unsupported) by setting xpack.ml.enabled: false in elasticsearch.yml");
-        }
-        return false;
-    }
-
     @Override
     public String name() {
         return XPackField.MACHINE_LEARNING;

+ 17 - 31
x-pack/plugin/ml/src/main/java/org/elasticsearch/xpack/ml/MlLifeCycleService.java

@@ -7,30 +7,27 @@ package org.elasticsearch.xpack.ml;
 
 import org.elasticsearch.cluster.service.ClusterService;
 import org.elasticsearch.common.component.LifecycleListener;
-import org.elasticsearch.env.Environment;
 import org.elasticsearch.xpack.ml.datafeed.DatafeedManager;
+import org.elasticsearch.xpack.ml.process.MlController;
 import org.elasticsearch.xpack.ml.process.MlMemoryTracker;
-import org.elasticsearch.xpack.ml.process.NativeController;
-import org.elasticsearch.xpack.ml.process.NativeControllerHolder;
 import org.elasticsearch.xpack.ml.job.process.autodetect.AutodetectProcessManager;
 
 import java.io.IOException;
+import java.util.Objects;
 
 public class MlLifeCycleService {
 
-    private final Environment environment;
-    private final ClusterService clusterService;
     private final DatafeedManager datafeedManager;
+    private final MlController mlController;
     private final AutodetectProcessManager autodetectProcessManager;
     private final MlMemoryTracker memoryTracker;
 
-    public MlLifeCycleService(Environment environment, ClusterService clusterService, DatafeedManager datafeedManager,
-                              AutodetectProcessManager autodetectProcessManager, MlMemoryTracker memoryTracker) {
-        this.environment = environment;
-        this.clusterService = clusterService;
-        this.datafeedManager = datafeedManager;
-        this.autodetectProcessManager = autodetectProcessManager;
-        this.memoryTracker = memoryTracker;
+    MlLifeCycleService(ClusterService clusterService, DatafeedManager datafeedManager, MlController mlController,
+                       AutodetectProcessManager autodetectProcessManager, MlMemoryTracker memoryTracker) {
+        this.datafeedManager = Objects.requireNonNull(datafeedManager);
+        this.mlController = Objects.requireNonNull(mlController);
+        this.autodetectProcessManager = Objects.requireNonNull(autodetectProcessManager);
+        this.memoryTracker = Objects.requireNonNull(memoryTracker);
         clusterService.addLifecycleListener(new LifecycleListener() {
             @Override
             public void beforeStop() {
@@ -41,27 +38,16 @@ public class MlLifeCycleService {
 
     public synchronized void stop() {
         try {
-            if (MachineLearningInfoTransportAction.isRunningOnMlPlatform(false)) {
-                // This prevents datafeeds from sending data to autodetect processes WITHOUT stopping the
-                // datafeeds, so they get reallocated.  We have to do this first, otherwise the datafeeds
-                // could fail if they send data to a dead autodetect process.
-                if (datafeedManager != null) {
-                    datafeedManager.isolateAllDatafeedsOnThisNodeBeforeShutdown();
-                }
-                NativeController nativeController = NativeControllerHolder.getNativeController(clusterService.getNodeName(), environment);
-                if (nativeController != null) {
-                    // This kills autodetect processes WITHOUT closing the jobs, so they get reallocated.
-                    if (autodetectProcessManager != null) {
-                        autodetectProcessManager.killAllProcessesOnThisNode();
-                    }
-                    nativeController.stop();
-                }
-            }
+            // This prevents datafeeds from sending data to autodetect processes WITHOUT stopping the
+            // datafeeds, so they get reassigned.  We have to do this first, otherwise the datafeeds
+            // could fail if they send data to a dead autodetect process.
+            datafeedManager.isolateAllDatafeedsOnThisNodeBeforeShutdown();
+            // This kills autodetect processes WITHOUT closing the jobs, so they get reassigned.
+            autodetectProcessManager.killAllProcessesOnThisNode();
+            mlController.stop();
         } catch (IOException e) {
             // We're stopping anyway, so don't let this complicate the shutdown sequence
         }
-        if (memoryTracker != null) {
-            memoryTracker.stop();
-        }
+        memoryTracker.stop();
     }
 }

+ 4 - 18
x-pack/plugin/ml/src/main/java/org/elasticsearch/xpack/ml/action/TransportMlInfoAction.java

@@ -12,7 +12,6 @@ import org.elasticsearch.cluster.service.ClusterService;
 import org.elasticsearch.common.inject.Inject;
 import org.elasticsearch.common.unit.ByteSizeUnit;
 import org.elasticsearch.common.unit.ByteSizeValue;
-import org.elasticsearch.env.Environment;
 import org.elasticsearch.tasks.Task;
 import org.elasticsearch.transport.TransportService;
 import org.elasticsearch.xpack.core.ml.MachineLearningField;
@@ -21,11 +20,8 @@ import org.elasticsearch.xpack.core.ml.action.MlInfoAction;
 import org.elasticsearch.xpack.core.ml.datafeed.DatafeedConfig;
 import org.elasticsearch.xpack.core.ml.job.config.AnalysisLimits;
 import org.elasticsearch.xpack.core.ml.job.config.Job;
-import org.elasticsearch.xpack.ml.process.NativeController;
-import org.elasticsearch.xpack.ml.process.NativeControllerHolder;
+import org.elasticsearch.xpack.ml.process.MlControllerHolder;
 
-import java.io.IOException;
-import java.util.Collections;
 import java.util.HashMap;
 import java.util.Map;
 import java.util.concurrent.TimeoutException;
@@ -38,23 +34,13 @@ public class TransportMlInfoAction extends HandledTransportAction<MlInfoAction.R
 
     @Inject
     public TransportMlInfoAction(TransportService transportService, ActionFilters actionFilters,
-                                 ClusterService clusterService, Environment env) {
+                                 ClusterService clusterService, MlControllerHolder mlControllerHolder) {
         super(MlInfoAction.NAME, transportService, actionFilters, (Supplier<MlInfoAction.Request>) MlInfoAction.Request::new);
+
         this.clusterService = clusterService;
 
         try {
-            NativeController nativeController = NativeControllerHolder.getNativeController(clusterService.getNodeName(), env);
-            // TODO: this leniency is only for tests. it can be removed when NativeController is created as a component and
-            // becomes a ctor arg to this action
-            if (nativeController != null) {
-                nativeCodeInfo = nativeController.getNativeCodeInfo();
-            } else {
-                nativeCodeInfo = Collections.emptyMap();
-            }
-        } catch (IOException e) {
-            // this should not be possible since this action is only registered when ML is enabled,
-            // and the MachineLearning plugin would have failed to create components
-            throw new IllegalStateException("native controller failed to load", e);
+            nativeCodeInfo = mlControllerHolder.getMlController().getNativeCodeInfo();
         } catch (TimeoutException e) {
             throw new RuntimeException("Could not get native code info from native controller", e);
         }

+ 6 - 4
x-pack/plugin/ml/src/main/java/org/elasticsearch/xpack/ml/dataframe/process/NativeAnalyticsProcess.java

@@ -6,6 +6,7 @@
 package org.elasticsearch.xpack.ml.dataframe.process;
 
 import org.elasticsearch.xpack.ml.process.AbstractNativeProcess;
+import org.elasticsearch.xpack.ml.process.NativeController;
 import org.elasticsearch.xpack.ml.process.ProcessResultsParser;
 
 import java.io.IOException;
@@ -22,10 +23,11 @@ public class NativeAnalyticsProcess extends AbstractNativeProcess implements Ana
 
     private final ProcessResultsParser<AnalyticsResult> resultsParser = new ProcessResultsParser<>(AnalyticsResult.PARSER);
 
-    protected NativeAnalyticsProcess(String jobId, InputStream logStream, OutputStream processInStream, InputStream processOutStream,
-                                     OutputStream processRestoreStream, int numberOfFields, List<Path> filesToDelete,
-                                     Consumer<String> onProcessCrash) {
-        super(jobId, logStream, processInStream, processOutStream, processRestoreStream, numberOfFields, filesToDelete, onProcessCrash);
+    protected NativeAnalyticsProcess(String jobId, NativeController nativeController, InputStream logStream, OutputStream processInStream,
+                                     InputStream processOutStream, OutputStream processRestoreStream, int numberOfFields,
+                                     List<Path> filesToDelete, Consumer<String> onProcessCrash) {
+        super(jobId, nativeController, logStream, processInStream, processOutStream, processRestoreStream, numberOfFields, filesToDelete,
+            onProcessCrash);
     }
 
     @Override

+ 1 - 1
x-pack/plugin/ml/src/main/java/org/elasticsearch/xpack/ml/dataframe/process/NativeAnalyticsProcessFactory.java

@@ -50,7 +50,7 @@ public class NativeAnalyticsProcessFactory implements AnalyticsProcessFactory {
 
         createNativeProcess(jobId, analyticsProcessConfig, filesToDelete, processPipes);
 
-        NativeAnalyticsProcess analyticsProcess = new NativeAnalyticsProcess(jobId, processPipes.getLogStream().get(),
+        NativeAnalyticsProcess analyticsProcess = new NativeAnalyticsProcess(jobId, nativeController, processPipes.getLogStream().get(),
                 processPipes.getProcessInStream().get(), processPipes.getProcessOutStream().get(), null, numberOfFields,
                 filesToDelete, reason -> {});
 

+ 5 - 3
x-pack/plugin/ml/src/main/java/org/elasticsearch/xpack/ml/job/process/autodetect/NativeAutodetectProcess.java

@@ -21,6 +21,7 @@ import org.elasticsearch.xpack.ml.job.process.autodetect.writer.AutodetectContro
 import org.elasticsearch.xpack.ml.job.results.AutodetectResult;
 import org.elasticsearch.xpack.ml.process.AbstractNativeProcess;
 import org.elasticsearch.xpack.ml.process.ProcessResultsParser;
+import org.elasticsearch.xpack.ml.process.NativeController;
 
 import java.io.IOException;
 import java.io.InputStream;
@@ -41,10 +42,11 @@ class NativeAutodetectProcess extends AbstractNativeProcess implements Autodetec
 
     private final ProcessResultsParser<AutodetectResult> resultsParser;
 
-    NativeAutodetectProcess(String jobId, InputStream logStream, OutputStream processInStream, InputStream processOutStream,
-                            OutputStream processRestoreStream, int numberOfFields, List<Path> filesToDelete,
+    NativeAutodetectProcess(String jobId, NativeController nativeController, InputStream logStream, OutputStream processInStream,
+                            InputStream processOutStream, OutputStream processRestoreStream, int numberOfFields, List<Path> filesToDelete,
                             ProcessResultsParser<AutodetectResult> resultsParser, Consumer<String> onProcessCrash) {
-        super(jobId, logStream, processInStream, processOutStream, processRestoreStream, numberOfFields, filesToDelete, onProcessCrash);
+        super(jobId, nativeController, logStream, processInStream, processOutStream, processRestoreStream, numberOfFields, filesToDelete,
+            onProcessCrash);
         this.resultsParser = resultsParser;
     }
 

+ 1 - 1
x-pack/plugin/ml/src/main/java/org/elasticsearch/xpack/ml/job/process/autodetect/NativeAutodetectProcessFactory.java

@@ -72,7 +72,7 @@ public class NativeAutodetectProcessFactory implements AutodetectProcessFactory
         AutodetectStateProcessor stateProcessor = new AutodetectStateProcessor(client, job.getId());
         ProcessResultsParser<AutodetectResult> resultsParser = new ProcessResultsParser<>(AutodetectResult.PARSER);
         NativeAutodetectProcess autodetect = new NativeAutodetectProcess(
-                job.getId(), processPipes.getLogStream().get(), processPipes.getProcessInStream().get(),
+                job.getId(), nativeController, processPipes.getLogStream().get(), processPipes.getProcessInStream().get(),
                 processPipes.getProcessOutStream().get(), processPipes.getRestoreStream().orElse(null), numberOfFields,
                 filesToDelete, resultsParser, onProcessCrash);
         try {

+ 4 - 2
x-pack/plugin/ml/src/main/java/org/elasticsearch/xpack/ml/job/process/normalizer/NativeNormalizerProcess.java

@@ -7,6 +7,7 @@ package org.elasticsearch.xpack.ml.job.process.normalizer;
 
 import org.elasticsearch.xpack.ml.job.process.normalizer.output.NormalizerResultHandler;
 import org.elasticsearch.xpack.ml.process.AbstractNativeProcess;
+import org.elasticsearch.xpack.ml.process.NativeController;
 
 import java.io.InputStream;
 import java.io.OutputStream;
@@ -19,8 +20,9 @@ class NativeNormalizerProcess extends AbstractNativeProcess implements Normalize
 
     private static final String NAME = "normalizer";
 
-    NativeNormalizerProcess(String jobId, InputStream logStream, OutputStream processInStream, InputStream processOutStream) {
-        super(jobId, logStream, processInStream, processOutStream, null, 0, Collections.emptyList(), (ignore) -> {});
+    NativeNormalizerProcess(String jobId, NativeController nativeController, InputStream logStream, OutputStream processInStream,
+                            InputStream processOutStream) {
+        super(jobId, nativeController, logStream, processInStream, processOutStream, null, 0, Collections.emptyList(), (ignore) -> {});
     }
 
     @Override

+ 1 - 1
x-pack/plugin/ml/src/main/java/org/elasticsearch/xpack/ml/job/process/normalizer/NativeNormalizerProcessFactory.java

@@ -42,7 +42,7 @@ public class NativeNormalizerProcessFactory implements NormalizerProcessFactory
                 true, false, true, true, false, false);
         createNativeProcess(jobId, quantilesState, processPipes, bucketSpan);
 
-        NativeNormalizerProcess normalizerProcess = new NativeNormalizerProcess(jobId, processPipes.getLogStream().get(),
+        NativeNormalizerProcess normalizerProcess = new NativeNormalizerProcess(jobId, nativeController, processPipes.getLogStream().get(),
                 processPipes.getProcessInStream().get(), processPipes.getProcessOutStream().get());
 
         try {

+ 6 - 4
x-pack/plugin/ml/src/main/java/org/elasticsearch/xpack/ml/process/AbstractNativeProcess.java

@@ -42,6 +42,7 @@ public abstract class AbstractNativeProcess implements NativeProcess {
     private static final Duration WAIT_FOR_KILL_TIMEOUT = Duration.ofMillis(1000);
 
     private final String jobId;
+    private final NativeController nativeController;
     private final CppLogMessageHandler cppLogHandler;
     private final OutputStream processInStream;
     private final InputStream processOutStream;
@@ -57,10 +58,11 @@ public abstract class AbstractNativeProcess implements NativeProcess {
     private volatile boolean processKilled;
     private volatile boolean isReady;
 
-    protected AbstractNativeProcess(String jobId, InputStream logStream, OutputStream processInStream, InputStream processOutStream,
-                                    OutputStream processRestoreStream, int numberOfFields, List<Path> filesToDelete,
-                                    Consumer<String> onProcessCrash) {
+    protected AbstractNativeProcess(String jobId, NativeController nativeController, InputStream logStream, OutputStream processInStream,
+                                    InputStream processOutStream, OutputStream processRestoreStream, int numberOfFields,
+                                    List<Path> filesToDelete, Consumer<String> onProcessCrash) {
         this.jobId = jobId;
+        this.nativeController = nativeController;
         cppLogHandler = new CppLogMessageHandler(jobId, logStream);
         this.processInStream = new BufferedOutputStream(processInStream);
         this.processOutStream = processOutStream;
@@ -183,7 +185,7 @@ public abstract class AbstractNativeProcess implements NativeProcess {
             // The PID comes via the processes log stream.  We don't wait for it to arrive here,
             // but if the wait times out it implies the process has only just started, in which
             // case it should die very quickly when we close its input stream.
-            NativeControllerHolder.getNativeController().killProcess(cppLogHandler.getPid(Duration.ZERO));
+            nativeController.killProcess(cppLogHandler.getPid(Duration.ZERO));
 
             // Wait for the process to die before closing processInStream as if the process
             // is still alive when processInStream is closed it may start persisting state

+ 26 - 0
x-pack/plugin/ml/src/main/java/org/elasticsearch/xpack/ml/process/DummyController.java

@@ -0,0 +1,26 @@
+/*
+ * Copyright Elasticsearch B.V. and/or licensed to Elasticsearch B.V. under one
+ * or more contributor license agreements. Licensed under the Elastic License;
+ * you may not use this file except in compliance with the Elastic License.
+ */
+package org.elasticsearch.xpack.ml.process;
+
+import java.util.Collections;
+import java.util.Map;
+
+/**
+ * A dummy ML controller for use in internal cluster tests where it
+ * is not possible/appropriate to start a native controller.
+ */
+public class DummyController implements MlController {
+
+    @Override
+    public Map<String, Object> getNativeCodeInfo() {
+        return Collections.emptyMap();
+    }
+
+    @Override
+    public void stop() {
+        // no-op
+    }
+}

+ 36 - 0
x-pack/plugin/ml/src/main/java/org/elasticsearch/xpack/ml/process/MlController.java

@@ -0,0 +1,36 @@
+/*
+ * Copyright Elasticsearch B.V. and/or licensed to Elasticsearch B.V. under one
+ * or more contributor license agreements. Licensed under the Elastic License;
+ * you may not use this file except in compliance with the Elastic License.
+ */
+package org.elasticsearch.xpack.ml.process;
+
+import java.io.IOException;
+import java.util.Map;
+import java.util.concurrent.TimeoutException;
+
+/**
+ * Interface for the controller that ML uses to start native processes.
+ */
+public interface MlController {
+
+    /**
+     * Get build/version information about the native code being used by the ML plugin.
+     * @return A map containing native code build/version information.
+     *         This map will be empty in the case where no native code exists.
+     * @throws TimeoutException If native code information cannot be obtained
+     *                          within a reasonable amount of time.
+     */
+    Map<String, Object> getNativeCodeInfo() throws TimeoutException;
+
+    /**
+     * Stop the controller.  For implementations where the controller is an external
+     * process this will instruct that external process to exit, thus preventing any
+     * subsequent controller operations from working.  Stopping the controller is
+     * irreversible; the only way to restart a controller is to restart the
+     * Elasticsearch node.
+     * @throws IOException If it is not possible to communicate the stop request to
+     *                     the controller.
+     */
+    void stop() throws IOException;
+}

+ 28 - 0
x-pack/plugin/ml/src/main/java/org/elasticsearch/xpack/ml/process/MlControllerHolder.java

@@ -0,0 +1,28 @@
+/*
+ * Copyright Elasticsearch B.V. and/or licensed to Elasticsearch B.V. under one
+ * or more contributor license agreements. Licensed under the Elastic License;
+ * you may not use this file except in compliance with the Elastic License.
+ */
+package org.elasticsearch.xpack.ml.process;
+
+import org.elasticsearch.node.Node;
+
+import java.util.Objects;
+
+/**
+ * Wrapper for the {@link MlController} interface that allows it to be used
+ * given the way {@link Node} does Guice bindings for plugin components.
+ * TODO: remove this class entirely once Guice is removed entirely.
+ */
+public class MlControllerHolder {
+
+    private MlController mlController;
+
+    public MlControllerHolder(MlController mlController) {
+        this.mlController = Objects.requireNonNull(mlController);
+    }
+
+    public MlController getMlController() {
+        return mlController;
+    }
+}

+ 10 - 3
x-pack/plugin/ml/src/main/java/org/elasticsearch/xpack/ml/process/NativeController.java

@@ -26,7 +26,7 @@ import java.util.concurrent.TimeoutException;
  * Maintains the connection to the native controller daemon that can start other processes.
  */
 @SuppressWarnings("ALL")
-public class NativeController {
+public class NativeController implements MlController {
     private static final Logger LOGGER = LogManager.getLogger(NativeController.class);
 
     /**
@@ -41,12 +41,17 @@ public class NativeController {
     private static final String START_COMMAND = "start";
     private static final String KILL_COMMAND = "kill";
 
-    public static final Map<String, Object> UNKNOWN_NATIVE_CODE_INFO = Map.of("version", "N/A", "build_hash", "N/A");
-
     private final String localNodeName;
     private final CppLogMessageHandler cppLogHandler;
     private final OutputStream commandStream;
 
+    public static NativeController makeNativeController(String localNodeName, Environment env)
+        throws IOException {
+        NativeController nativeController = new NativeController(localNodeName, env, new NamedPipeHelper());
+        nativeController.tailLogsInThread();
+        return nativeController;
+    }
+
     NativeController(String localNodeName, Environment env, NamedPipeHelper namedPipeHelper) throws IOException {
         ProcessPipes processPipes = new ProcessPipes(env, namedPipeHelper, CONTROLLER, null,
                 true, true, false, false, false, false);
@@ -80,6 +85,7 @@ public class NativeController {
         return cppLogHandler.getPid(CONTROLLER_CONNECT_TIMEOUT);
     }
 
+    @Override
     public Map<String, Object> getNativeCodeInfo() throws TimeoutException {
         return cppLogHandler.getNativeCodeInfo(CONTROLLER_CONNECT_TIMEOUT);
     }
@@ -143,6 +149,7 @@ public class NativeController {
         }
     }
 
+    @Override
     public void stop() throws IOException {
         // The C++ process will exit when it gets EOF on the command stream
         commandStream.close();

+ 0 - 60
x-pack/plugin/ml/src/main/java/org/elasticsearch/xpack/ml/process/NativeControllerHolder.java

@@ -1,60 +0,0 @@
-/*
- * Copyright Elasticsearch B.V. and/or licensed to Elasticsearch B.V. under one
- * or more contributor license agreements. Licensed under the Elastic License;
- * you may not use this file except in compliance with the Elastic License.
- */
-package org.elasticsearch.xpack.ml.process;
-
-import org.elasticsearch.env.Environment;
-import org.elasticsearch.xpack.core.ml.MachineLearningField;
-import org.elasticsearch.xpack.ml.utils.NamedPipeHelper;
-
-import java.io.IOException;
-
-/**
- * Manages a singleton NativeController so that both the MachineLearningInfoTransportAction and MachineLearning classes can
- * get access to the same one.
- */
-public class NativeControllerHolder {
-
-    private static final Object lock = new Object();
-    private static NativeController nativeController;
-
-    private NativeControllerHolder() {
-    }
-
-    /**
-     * Get a reference to the singleton native process controller.
-     *
-     * The NativeController is created lazily to allow time for the C++ process to be started before connection is attempted.
-     *
-     * <code>null</code> is returned to tests where xpack.ml.autodetect_process=false.
-     *
-     * Calls may throw an exception if initial connection to the C++ process fails.
-     */
-    public static NativeController getNativeController(String localNodeName, Environment environment) throws IOException {
-
-        if (MachineLearningField.AUTODETECT_PROCESS.get(environment.settings())) {
-            synchronized (lock) {
-                if (nativeController == null) {
-                    nativeController = new NativeController(localNodeName, environment, new NamedPipeHelper());
-                    nativeController.tailLogsInThread();
-                }
-            }
-            return nativeController;
-        }
-        return null;
-    }
-
-    /**
-     * Get a reference to the singleton native process controller.
-     *
-     * Assumes that if it is possible for a native controller to exist that it will already have been created.
-     * Designed for use by objects that don't have access to the environment but know a native controller must exist
-     * for the object calling this method to exist.
-     */
-    public static NativeController getNativeController() {
-        return nativeController;
-    }
-
-}

+ 0 - 16
x-pack/plugin/ml/src/test/java/org/elasticsearch/xpack/ml/MachineLearningInfoTransportActionTests.java

@@ -5,7 +5,6 @@
  */
 package org.elasticsearch.xpack.ml;
 
-import org.elasticsearch.ElasticsearchException;
 import org.elasticsearch.Version;
 import org.elasticsearch.action.ActionListener;
 import org.elasticsearch.action.support.ActionFilters;
@@ -104,21 +103,6 @@ public class MachineLearningInfoTransportActionTests extends ESTestCase {
             TestEnvironment.newEnvironment(settings), client, licenseState, jobManagerHolder);
     }
 
-    public void testIsRunningOnMlPlatform() {
-        assertTrue(MachineLearningInfoTransportAction.isRunningOnMlPlatform("Linux", "amd64", true));
-        assertTrue(MachineLearningInfoTransportAction.isRunningOnMlPlatform("Windows 10", "amd64", true));
-        assertTrue(MachineLearningInfoTransportAction.isRunningOnMlPlatform("Mac OS X", "x86_64", true));
-        assertFalse(MachineLearningInfoTransportAction.isRunningOnMlPlatform("Linux", "i386", false));
-        assertFalse(MachineLearningInfoTransportAction.isRunningOnMlPlatform("Windows 10", "i386", false));
-        assertFalse(MachineLearningInfoTransportAction.isRunningOnMlPlatform("SunOS", "amd64", false));
-        expectThrows(ElasticsearchException.class,
-                () -> MachineLearningInfoTransportAction.isRunningOnMlPlatform("Linux", "i386", true));
-        expectThrows(ElasticsearchException.class,
-                () -> MachineLearningInfoTransportAction.isRunningOnMlPlatform("Windows 10", "i386", true));
-        expectThrows(ElasticsearchException.class,
-                () -> MachineLearningInfoTransportAction.isRunningOnMlPlatform("SunOS", "amd64", true));
-    }
-
     public void testAvailable() throws Exception {
         MachineLearningInfoTransportAction featureSet = new MachineLearningInfoTransportAction(
             mock(TransportService.class), mock(ActionFilters.class), commonSettings, licenseState);

+ 6 - 5
x-pack/plugin/ml/src/test/java/org/elasticsearch/xpack/ml/job/process/autodetect/NativeAutodetectProcessTests.java

@@ -14,6 +14,7 @@ import org.elasticsearch.xpack.ml.job.process.autodetect.params.TimeRange;
 import org.elasticsearch.xpack.ml.job.process.autodetect.writer.AutodetectControlMsgWriter;
 import org.elasticsearch.xpack.ml.job.results.AutodetectResult;
 import org.elasticsearch.xpack.ml.process.ProcessResultsParser;
+import org.elasticsearch.xpack.ml.process.NativeController;
 import org.junit.Assert;
 import org.junit.Before;
 
@@ -57,7 +58,7 @@ public class NativeAutodetectProcessTests extends ESTestCase {
         when(logStream.read(new byte[1024])).thenReturn(-1);
         InputStream outputStream = mock(InputStream.class);
         when(outputStream.read(new byte[512])).thenReturn(-1);
-        try (NativeAutodetectProcess process = new NativeAutodetectProcess("foo", logStream,
+        try (NativeAutodetectProcess process = new NativeAutodetectProcess("foo", mock(NativeController.class), logStream,
                 mock(OutputStream.class), outputStream, mock(OutputStream.class),
                 NUMBER_FIELDS, null,
                 new ProcessResultsParser<>(AutodetectResult.PARSER), mock(Consumer.class))) {
@@ -80,7 +81,7 @@ public class NativeAutodetectProcessTests extends ESTestCase {
         when(outputStream.read(new byte[512])).thenReturn(-1);
         String[] record = {"r1", "r2", "r3", "r4", "r5"};
         ByteArrayOutputStream bos = new ByteArrayOutputStream(1024);
-        try (NativeAutodetectProcess process = new NativeAutodetectProcess("foo", logStream,
+        try (NativeAutodetectProcess process = new NativeAutodetectProcess("foo", mock(NativeController.class), logStream,
                 bos, outputStream, mock(OutputStream.class), NUMBER_FIELDS, Collections.emptyList(),
                 new ProcessResultsParser<>(AutodetectResult.PARSER), mock(Consumer.class))) {
             process.start(executorService, mock(AutodetectStateProcessor.class), mock(InputStream.class));
@@ -114,7 +115,7 @@ public class NativeAutodetectProcessTests extends ESTestCase {
         InputStream outputStream = mock(InputStream.class);
         when(outputStream.read(new byte[512])).thenReturn(-1);
         ByteArrayOutputStream bos = new ByteArrayOutputStream(AutodetectControlMsgWriter.FLUSH_SPACES_LENGTH + 1024);
-        try (NativeAutodetectProcess process = new NativeAutodetectProcess("foo", logStream,
+        try (NativeAutodetectProcess process = new NativeAutodetectProcess("foo", mock(NativeController.class), logStream,
                 bos, outputStream, mock(OutputStream.class), NUMBER_FIELDS, Collections.emptyList(),
                 new ProcessResultsParser<>(AutodetectResult.PARSER), mock(Consumer.class))) {
             process.start(executorService, mock(AutodetectStateProcessor.class), mock(InputStream.class));
@@ -147,7 +148,7 @@ public class NativeAutodetectProcessTests extends ESTestCase {
         String json = "some string of data";
         ByteArrayInputStream processOutStream = new ByteArrayInputStream(json.getBytes(StandardCharsets.UTF_8));
 
-        try (NativeAutodetectProcess process = new NativeAutodetectProcess("foo", logStream,
+        try (NativeAutodetectProcess process = new NativeAutodetectProcess("foo", mock(NativeController.class), logStream,
             processInStream, processOutStream, mock(OutputStream.class), NUMBER_FIELDS, Collections.emptyList(),
             new ProcessResultsParser<AutodetectResult>(AutodetectResult.PARSER), mock(Consumer.class))) {
 
@@ -162,7 +163,7 @@ public class NativeAutodetectProcessTests extends ESTestCase {
         InputStream outputStream = mock(InputStream.class);
         when(outputStream.read(new byte[512])).thenReturn(-1);
         ByteArrayOutputStream bos = new ByteArrayOutputStream(1024);
-        try (NativeAutodetectProcess process = new NativeAutodetectProcess("foo", logStream,
+        try (NativeAutodetectProcess process = new NativeAutodetectProcess("foo", mock(NativeController.class), logStream,
                 bos, outputStream, mock(OutputStream.class), NUMBER_FIELDS, Collections.emptyList(),
                 new ProcessResultsParser<>(AutodetectResult.PARSER), mock(Consumer.class))) {
             process.start(executorService, mock(AutodetectStateProcessor.class), mock(InputStream.class));